3434import com .datastax .driver .core .utils .MoreFutures ;
3535import com .datastax .driver .core .utils .MoreObjects ;
3636import com .google .common .annotations .VisibleForTesting ;
37+ import com .google .common .collect .ImmutableSet ;
3738import com .google .common .collect .Iterators ;
3839import com .google .common .util .concurrent .FutureCallback ;
3940import com .google .common .util .concurrent .Futures ;
@@ -86,6 +87,91 @@ class ControlConnection implements Connection.Owner {
8687 private static final String SELECT_SCHEMA_LOCAL =
8788 "SELECT schema_version, host_id FROM system.local WHERE key='local'" ;
8889
90+ // IMPORTANT: Every column read from system.local rows — in updateInfo(),
91+ // refreshNodeListAndTokenMap(), isValidPeer(), and DefaultEndPointFactory — MUST be listed here.
92+ // If a new column read is added anywhere that consumes a system table row, add it to the
93+ // appropriate set below, otherwise it will be silently excluded from projected queries.
94+ @ VisibleForTesting
95+ static final ImmutableSet <String > LOCAL_COLUMNS_OF_INTEREST =
96+ ImmutableSet .of (
97+ "cluster_name" ,
98+ "partitioner" ,
99+ "data_center" ,
100+ "rack" ,
101+ "release_version" ,
102+ "native_address" ,
103+ "native_port" ,
104+ "native_transport_address" ,
105+ "native_transport_port" ,
106+ "native_transport_port_ssl" ,
107+ "rpc_address" ,
108+ "broadcast_address" ,
109+ "broadcast_port" ,
110+ "listen_address" ,
111+ "listen_port" ,
112+ "tokens" ,
113+ "host_id" ,
114+ "schema_version" ,
115+ "workload" ,
116+ "graph" ,
117+ "dse_version" );
118+
119+ // IMPORTANT: see LOCAL_COLUMNS_OF_INTEREST note above.
120+ // Includes all columns consumed by updateInfo(), refreshNodeListAndTokenMap(),
121+ // isValidPeer(), and DefaultEndPointFactory.create() from system.peers rows.
122+ // Columns that are absent from the actual server schema are silently excluded by
123+ // intersectWithNeeded(), so listing extra columns here is safe.
124+ @ VisibleForTesting
125+ static final ImmutableSet <String > PEERS_COLUMNS_OF_INTEREST =
126+ ImmutableSet .of (
127+ "peer" ,
128+ "peer_port" , // peers_v2 column; harmless to list here — absent on peers, excluded safely
129+ "rpc_address" ,
130+ "data_center" ,
131+ "rack" ,
132+ "release_version" ,
133+ "tokens" ,
134+ "listen_address" ,
135+ "listen_port" ,
136+ "host_id" ,
137+ "schema_version" ,
138+ "native_address" , // may appear on some server variants; guarded by contains() in code
139+ "native_port" , // same
140+ "native_transport_address" ,
141+ "native_transport_port" ,
142+ "native_transport_port_ssl" ,
143+ "workload" ,
144+ "graph" ,
145+ "dse_version" );
146+
147+ // IMPORTANT: see LOCAL_COLUMNS_OF_INTEREST note above.
148+ // Includes all columns consumed by updateInfo(), refreshNodeListAndTokenMap(),
149+ // isValidPeer(), and DefaultEndPointFactory.create() from system.peers_v2 rows.
150+ // Columns that are absent from the actual server schema are silently excluded by
151+ // intersectWithNeeded(), so listing extra columns here is safe.
152+ @ VisibleForTesting
153+ static final ImmutableSet <String > PEERS_V2_COLUMNS_OF_INTEREST =
154+ ImmutableSet .of (
155+ "peer" ,
156+ "peer_port" ,
157+ "native_address" ,
158+ "native_port" ,
159+ "data_center" ,
160+ "rack" ,
161+ "release_version" ,
162+ "tokens" ,
163+ "host_id" ,
164+ "schema_version" ,
165+ "workload" ,
166+ "graph" ,
167+ "dse_version" ,
168+ "listen_address" ,
169+ "listen_port" ,
170+ "rpc_address" , // legacy; guarded by contains() in code — harmless if absent
171+ "native_transport_address" , // same
172+ "native_transport_port" , // same
173+ "native_transport_port_ssl" ); // same
174+
89175 private static final VersionNumber _3_11 = VersionNumber .parse ("3.11.0" );
90176
91177 @ VisibleForTesting
@@ -102,10 +188,30 @@ class ControlConnection implements Connection.Owner {
102188 // from here on out.
103189 private volatile boolean isPeersV2 = true ;
104190
191+ // Column projection caches. null = uninitialized: the first query to each system table issues
192+ // SELECT * to discover which columns the server exposes, then subsequent queries project only
193+ // the intersection of those columns with the corresponding *_COLUMNS_OF_INTEREST set.
194+ // Reset to null on every new connection so that the new server's schema is re-discovered.
195+ private volatile Set <String > localColumns = null ;
196+ private volatile Set <String > peersColumns = null ;
197+ private volatile Set <String > peersV2Columns = null ;
198+
105199 public ControlConnection (Cluster .Manager manager ) {
106200 this .cluster = manager ;
107201 }
108202
203+ /**
204+ * Resets the projected-column caches so that the next query to each system table sends {@code
205+ * SELECT *} and re-discovers available columns. Intended for use in tests that clear Scassandra
206+ * primes between driver operations.
207+ */
208+ @ VisibleForTesting
209+ void resetColumnCaches () {
210+ localColumns = null ;
211+ peersColumns = null ;
212+ peersV2Columns = null ;
213+ }
214+
109215 // Only for the initial connection. Does not schedule retries if it fails
110216 void connect () throws UnsupportedProtocolVersionException {
111217 if (isShutdown ) return ;
@@ -234,6 +340,11 @@ private void signalError() {
234340 private void setNewConnection (Connection newConnection ) {
235341 Host .statesLogger .debug ("[Control connection] established to {}" , newConnection .endPoint );
236342 newConnection .setOwner (this );
343+ // Reset column caches so the new connection triggers a fresh SELECT * to discover which
344+ // columns the server exposes before switching to projected queries.
345+ localColumns = null ;
346+ peersColumns = null ;
347+ peersV2Columns = null ;
237348 Connection old = connectionRef .getAndSet (newConnection );
238349 if (old != null && !old .isClosed ()) old .closeAsync ().force ();
239350 }
@@ -326,6 +437,14 @@ private Connection tryConnect(Host host, boolean isInitialConnection)
326437 ProtocolEvent .Type .SCHEMA_CHANGE );
327438 connection .write (new Requests .Register (evs ));
328439
440+ // Reset column caches so refreshNodeListAndTokenMap() uses SELECT * to rediscover
441+ // which columns this server exposes, rather than a projected query built for the
442+ // previous connection's server. setNewConnection() also resets these, but it is
443+ // called after tryConnect() returns — too late to protect the queries below.
444+ localColumns = null ;
445+ peersColumns = null ;
446+ peersV2Columns = null ;
447+
329448 // We need to refresh the node list first so we know about the cassandra version of
330449 // the node we're connecting to.
331450 // This will create the token map for the first time, but it will be incomplete
@@ -491,26 +610,54 @@ private Row fetchNodeInfo(Host host, Connection c)
491610 if (isConnectedHost || host .getBroadcastSocketAddress () != null ) {
492611 String query ;
493612 if (isConnectedHost ) {
494- query = SELECT_LOCAL ;
613+ query =
614+ localColumns == null
615+ ? SELECT_LOCAL
616+ : buildProjectedQuery ("system.local" , localColumns , "key='local'" );
495617 } else {
496618 InetSocketAddress broadcastAddress = host .getBroadcastSocketAddress ();
497- query =
498- isPeersV2
499- ? SELECT_PEERS_V2
500- + " WHERE peer='"
501- + broadcastAddress .getAddress ().getHostAddress ()
502- + "' AND peer_port="
503- + broadcastAddress .getPort ()
504- : SELECT_PEERS
505- + " WHERE peer='"
506- + broadcastAddress .getAddress ().getHostAddress ()
507- + "'" ;
619+ // Always use SELECT * for single-row WHERE lookups. Projected queries are only used for
620+ // full-table scans via selectPeersFuture(), where the cache is guaranteed to be warm and
621+ // every node has the projected full-scan prime registered. For WHERE lookups the control
622+ // connection may query a node that was never restarted (and therefore still carries only
623+ // the original SELECT * prime from init time), so projecting here risks a cache miss.
624+ if (isPeersV2 ) {
625+ String whereClause =
626+ "peer='"
627+ + broadcastAddress .getAddress ().getHostAddress ()
628+ + "' AND peer_port="
629+ + broadcastAddress .getPort ();
630+ query = SELECT_PEERS_V2 + " WHERE " + whereClause ;
631+ } else {
632+ String whereClause = "peer='" + broadcastAddress .getAddress ().getHostAddress () + "'" ;
633+ query = SELECT_PEERS + " WHERE " + whereClause ;
634+ }
508635 }
509636 DefaultResultSetFuture future =
510637 new DefaultResultSetFuture (null , cluster .protocolVersion (), new Requests .Query (query ));
511638 c .write (future );
512- Row row = future .get ().one ();
639+ ResultSet rs = future .get ();
640+ Row row = rs .one ();
641+ // Populate the column cache on first successful WHERE lookup so that subsequent full-table
642+ // scans via selectPeersFuture() can send projected queries. Only populate when a row is
643+ // found: if the WHERE returned empty we are about to fall through to the full-scan path,
644+ // which has its own cache-population logic; populating here on an empty result would cause
645+ // selectPeersFuture() to send a projected query before it has verified the server supports
646+ // the projected columns.
513647 if (row != null ) {
648+ if (isConnectedHost ) {
649+ if (localColumns == null ) {
650+ localColumns = intersectWithNeeded (rs , LOCAL_COLUMNS_OF_INTEREST );
651+ }
652+ } else if (isPeersV2 ) {
653+ if (peersV2Columns == null ) {
654+ peersV2Columns = intersectWithNeeded (rs , PEERS_V2_COLUMNS_OF_INTEREST );
655+ }
656+ } else {
657+ if (peersColumns == null ) {
658+ peersColumns = intersectWithNeeded (rs , PEERS_COLUMNS_OF_INTEREST );
659+ }
660+ }
514661 return row ;
515662 } else {
516663 InetSocketAddress address = host .getBroadcastSocketAddress ();
@@ -717,9 +864,13 @@ private static void updateLocationInfo(
717864 */
718865 private ListenableFuture <ResultSet > selectPeersFuture (final Connection connection ) {
719866 if (isPeersV2 ) {
867+ String peersV2Query =
868+ peersV2Columns == null
869+ ? SELECT_PEERS_V2
870+ : buildProjectedQuery ("system.peers_v2" , peersV2Columns , null );
720871 DefaultResultSetFuture peersV2Future =
721872 new DefaultResultSetFuture (
722- null , cluster .protocolVersion (), new Requests .Query (SELECT_PEERS_V2 ));
873+ null , cluster .protocolVersion (), new Requests .Query (peersV2Query ));
723874 connection .write (peersV2Future );
724875 final SettableFuture <ResultSet > peersFuture = SettableFuture .create ();
725876 // if peers v2 query fails, query peers table instead.
@@ -729,6 +880,9 @@ private ListenableFuture<ResultSet> selectPeersFuture(final Connection connectio
729880
730881 @ Override
731882 public void onSuccess (ResultSet result ) {
883+ if (peersV2Columns == null ) {
884+ peersV2Columns = intersectWithNeeded (result , PEERS_V2_COLUMNS_OF_INTEREST );
885+ }
732886 peersFuture .set (result );
733887 }
734888
@@ -742,6 +896,9 @@ public void onFailure(Throwable t) {
742896 || (t instanceof ServerError
743897 && t .getMessage ().contains ("Unknown keyspace/cf pair (system.peers_v2)" ))) {
744898 isPeersV2 = false ;
899+ // Also reset the peers cache so the first system.peers query issues SELECT *
900+ // to discover which columns that table exposes on this server.
901+ peersColumns = null ;
745902 MoreFutures .propagateFuture (peersFuture , selectPeersFuture (connection ));
746903 } else {
747904 peersFuture .setException (t );
@@ -751,14 +908,73 @@ public void onFailure(Throwable t) {
751908 MoreExecutors .directExecutor ());
752909 return peersFuture ;
753910 } else {
754- DefaultResultSetFuture peersFuture =
911+ String peersQuery =
912+ peersColumns == null
913+ ? SELECT_PEERS
914+ : buildProjectedQuery ("system.peers" , peersColumns , null );
915+ DefaultResultSetFuture rawFuture =
755916 new DefaultResultSetFuture (
756- null , cluster .protocolVersion (), new Requests .Query (SELECT_PEERS ));
757- connection .write (peersFuture );
917+ null , cluster .protocolVersion (), new Requests .Query (peersQuery ));
918+ connection .write (rawFuture );
919+ final SettableFuture <ResultSet > peersFuture = SettableFuture .create ();
920+ Futures .addCallback (
921+ rawFuture ,
922+ new FutureCallback <ResultSet >() {
923+ @ Override
924+ public void onSuccess (ResultSet result ) {
925+ if (peersColumns == null ) {
926+ peersColumns = intersectWithNeeded (result , PEERS_COLUMNS_OF_INTEREST );
927+ }
928+ peersFuture .set (result );
929+ }
930+
931+ @ Override
932+ public void onFailure (Throwable t ) {
933+ peersFuture .setException (t );
934+ }
935+ },
936+ MoreExecutors .directExecutor ());
758937 return peersFuture ;
759938 }
760939 }
761940
941+ /**
942+ * Returns the intersection of the columns returned by the server (from {@code rs}) with the given
943+ * {@code needed} set, or {@code null} if the intersection is empty. The result is used to cache
944+ * projected column lists so subsequent queries fetch only what the driver actually reads. A
945+ * {@code null} return keeps the cache in the "uninitialized" sentinel state, ensuring the driver
946+ * continues issuing {@code SELECT *} rather than generating an invalid empty-column projection.
947+ */
948+ @ VisibleForTesting
949+ static Set <String > intersectWithNeeded (ResultSet rs , ImmutableSet <String > needed ) {
950+ ImmutableSet .Builder <String > result = ImmutableSet .builder ();
951+ for (ColumnDefinitions .Definition def : rs .getColumnDefinitions ()) {
952+ if (needed .contains (def .getName ())) {
953+ result .add (def .getName ());
954+ }
955+ }
956+ ImmutableSet <String > built = result .build ();
957+ return built .isEmpty () ? null : built ;
958+ }
959+
960+ /**
961+ * Builds a {@code SELECT col1, col2, ... FROM table [WHERE whereClause]} query string from the
962+ * given projected column set. {@code whereClause} may be {@code null} for table-wide scans.
963+ */
964+ @ VisibleForTesting
965+ static String buildProjectedQuery (String table , Set <String > columns , String whereClause ) {
966+ StringBuilder sb = new StringBuilder ("SELECT " );
967+ boolean first = true ;
968+ for (String col : columns ) {
969+ if (!first ) sb .append (", " );
970+ sb .append (col );
971+ first = false ;
972+ }
973+ sb .append (" FROM " ).append (table );
974+ if (whereClause != null ) sb .append (" WHERE " ).append (whereClause );
975+ return sb .toString ();
976+ }
977+
762978 private void refreshNodeListAndTokenMap (
763979 final Connection connection ,
764980 final Cluster .Manager cluster ,
@@ -772,9 +988,12 @@ private void refreshNodeListAndTokenMap(
772988
773989 // Make sure we're up to date on nodes and tokens
774990
991+ String localQuery =
992+ localColumns == null
993+ ? SELECT_LOCAL
994+ : buildProjectedQuery ("system.local" , localColumns , "key='local'" );
775995 DefaultResultSetFuture localFuture =
776- new DefaultResultSetFuture (
777- null , cluster .protocolVersion (), new Requests .Query (SELECT_LOCAL ));
996+ new DefaultResultSetFuture (null , cluster .protocolVersion (), new Requests .Query (localQuery ));
778997 ListenableFuture <ResultSet > peersFuture = selectPeersFuture (connection );
779998 connection .write (localFuture );
780999
@@ -783,7 +1002,11 @@ private void refreshNodeListAndTokenMap(
7831002 Map <Host , Set <Token >> tokenMap = new HashMap <Host , Set <Token >>();
7841003
7851004 // Update cluster name, DC and rack for the one node we are connected to
786- Row localRow = localFuture .get ().one ();
1005+ ResultSet localRs = localFuture .get ();
1006+ if (localColumns == null ) {
1007+ localColumns = intersectWithNeeded (localRs , LOCAL_COLUMNS_OF_INTEREST );
1008+ }
1009+ Row localRow = localRs .one ();
7871010 if (localRow == null ) {
7881011 throw new IllegalStateException (
7891012 String .format (
0 commit comments