Skip to content

Commit 512401d

Browse files
committed
refactor: extract column projection logic into SystemColumnProjection
Move the three *_COLUMNS_OF_INTEREST constants, the three volatile cache fields (localColumns, peersColumns, peersV2Columns), and the two static helpers (intersectWithNeeded, buildProjectedQuery) out of ControlConnection and into a new package-private SystemColumnProjection class. ControlConnection now holds a single 'projection' instance and delegates all projection state and query-building to it. resetColumnCaches() remains on ControlConnection as a @VisibleForTesting thin wrapper over projection.reset(), used by ControlConnectionTest between Scassandra prime clears. ControlConnectionUnitTest is updated to test SystemColumnProjection directly; ScassandraCluster references updated from ControlConnection.* to SystemColumnProjection.*.
1 parent 7ba90d7 commit 512401d

4 files changed

Lines changed: 300 additions & 196 deletions

File tree

driver-core/src/main/java/com/datastax/driver/core/ControlConnection.java

Lines changed: 32 additions & 168 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@
3434
import com.datastax.driver.core.utils.MoreFutures;
3535
import com.datastax.driver.core.utils.MoreObjects;
3636
import com.google.common.annotations.VisibleForTesting;
37-
import com.google.common.collect.ImmutableSet;
3837
import com.google.common.collect.Iterators;
3938
import com.google.common.util.concurrent.FutureCallback;
4039
import com.google.common.util.concurrent.Futures;
@@ -87,91 +86,6 @@ class ControlConnection implements Connection.Owner {
8786
private static final String SELECT_SCHEMA_LOCAL =
8887
"SELECT schema_version, host_id FROM system.local WHERE key='local'";
8988

90-
// IMPORTANT: Every column read from system.local rows — in updateInfo(),
91-
// refreshNodeListAndTokenMap(), isValidPeer(), and DefaultEndPointFactory — MUST be listed here.
92-
// If a new column read is added anywhere that consumes a system table row, add it to the
93-
// appropriate set below, otherwise it will be silently excluded from projected queries.
94-
@VisibleForTesting
95-
static final ImmutableSet<String> LOCAL_COLUMNS_OF_INTEREST =
96-
ImmutableSet.of(
97-
"cluster_name",
98-
"partitioner",
99-
"data_center",
100-
"rack",
101-
"release_version",
102-
"native_address",
103-
"native_port",
104-
"native_transport_address",
105-
"native_transport_port",
106-
"native_transport_port_ssl",
107-
"rpc_address",
108-
"broadcast_address",
109-
"broadcast_port",
110-
"listen_address",
111-
"listen_port",
112-
"tokens",
113-
"host_id",
114-
"schema_version",
115-
"workload",
116-
"graph",
117-
"dse_version");
118-
119-
// IMPORTANT: see LOCAL_COLUMNS_OF_INTEREST note above.
120-
// Includes all columns consumed by updateInfo(), refreshNodeListAndTokenMap(),
121-
// isValidPeer(), and DefaultEndPointFactory.create() from system.peers rows.
122-
// Columns that are absent from the actual server schema are silently excluded by
123-
// intersectWithNeeded(), so listing extra columns here is safe.
124-
@VisibleForTesting
125-
static final ImmutableSet<String> PEERS_COLUMNS_OF_INTEREST =
126-
ImmutableSet.of(
127-
"peer",
128-
"peer_port", // peers_v2 column; harmless to list here — absent on peers, excluded safely
129-
"rpc_address",
130-
"data_center",
131-
"rack",
132-
"release_version",
133-
"tokens",
134-
"listen_address",
135-
"listen_port",
136-
"host_id",
137-
"schema_version",
138-
"native_address", // may appear on some server variants; guarded by contains() in code
139-
"native_port", // same
140-
"native_transport_address",
141-
"native_transport_port",
142-
"native_transport_port_ssl",
143-
"workload",
144-
"graph",
145-
"dse_version");
146-
147-
// IMPORTANT: see LOCAL_COLUMNS_OF_INTEREST note above.
148-
// Includes all columns consumed by updateInfo(), refreshNodeListAndTokenMap(),
149-
// isValidPeer(), and DefaultEndPointFactory.create() from system.peers_v2 rows.
150-
// Columns that are absent from the actual server schema are silently excluded by
151-
// intersectWithNeeded(), so listing extra columns here is safe.
152-
@VisibleForTesting
153-
static final ImmutableSet<String> PEERS_V2_COLUMNS_OF_INTEREST =
154-
ImmutableSet.of(
155-
"peer",
156-
"peer_port",
157-
"native_address",
158-
"native_port",
159-
"data_center",
160-
"rack",
161-
"release_version",
162-
"tokens",
163-
"host_id",
164-
"schema_version",
165-
"workload",
166-
"graph",
167-
"dse_version",
168-
"listen_address",
169-
"listen_port",
170-
"rpc_address", // legacy; guarded by contains() in code — harmless if absent
171-
"native_transport_address", // same
172-
"native_transport_port", // same
173-
"native_transport_port_ssl"); // same
174-
17589
private static final VersionNumber _3_11 = VersionNumber.parse("3.11.0");
17690

17791
@VisibleForTesting
@@ -188,13 +102,7 @@ class ControlConnection implements Connection.Owner {
188102
// from here on out.
189103
private volatile boolean isPeersV2 = true;
190104

191-
// Column projection caches. null = uninitialized: the first query to each system table issues
192-
// SELECT * to discover which columns the server exposes, then subsequent queries project only
193-
// the intersection of those columns with the corresponding *_COLUMNS_OF_INTEREST set.
194-
// Reset to null on every new connection so that the new server's schema is re-discovered.
195-
private volatile Set<String> localColumns = null;
196-
private volatile Set<String> peersColumns = null;
197-
private volatile Set<String> peersV2Columns = null;
105+
private final SystemColumnProjection projection = new SystemColumnProjection();
198106

199107
public ControlConnection(Cluster.Manager manager) {
200108
this.cluster = manager;
@@ -207,9 +115,7 @@ public ControlConnection(Cluster.Manager manager) {
207115
*/
208116
@VisibleForTesting
209117
void resetColumnCaches() {
210-
localColumns = null;
211-
peersColumns = null;
212-
peersV2Columns = null;
118+
projection.reset();
213119
}
214120

215121
// Only for the initial connection. Does not schedule retries if it fails
@@ -436,9 +342,7 @@ private Connection tryConnect(Host host, boolean isInitialConnection)
436342
// which columns this server exposes, rather than a projected query built for a
437343
// previous connection's server. The caches are populated during the queries below
438344
// and remain warm for the lifetime of this connection.
439-
localColumns = null;
440-
peersColumns = null;
441-
peersV2Columns = null;
345+
projection.reset();
442346

443347
// We need to refresh the node list first so we know about the cassandra version of
444348
// the node we're connecting to.
@@ -567,6 +471,11 @@ void refreshNodeListAndTokenMap() {
567471
} catch (ExecutionException e) {
568472
// If we're being shutdown during refresh, this can happen. That's fine so don't scare the
569473
// user.
474+
if (e.getCause() instanceof InvalidQueryException) {
475+
// A projected query referenced a column the server no longer exposes; reset caches so
476+
// the next connection re-discovers columns via SELECT *.
477+
projection.reset();
478+
}
570479
if (!isShutdown)
571480
logger.error(
572481
"[Control connection] Unexpected error while refreshing node list and token map", e);
@@ -605,10 +514,7 @@ private Row fetchNodeInfo(Host host, Connection c)
605514
if (isConnectedHost || host.getBroadcastSocketAddress() != null) {
606515
String query;
607516
if (isConnectedHost) {
608-
query =
609-
localColumns == null
610-
? SELECT_LOCAL
611-
: buildProjectedQuery("system.local", localColumns, "key='local'");
517+
query = projection.localQuery();
612518
} else {
613519
InetSocketAddress broadcastAddress = host.getBroadcastSocketAddress();
614520
// Always use SELECT * for single-row WHERE lookups. Projected queries are only used for
@@ -641,17 +547,11 @@ private Row fetchNodeInfo(Host host, Connection c)
641547
// the projected columns.
642548
if (row != null) {
643549
if (isConnectedHost) {
644-
if (localColumns == null) {
645-
localColumns = intersectWithNeeded(rs, LOCAL_COLUMNS_OF_INTEREST);
646-
}
550+
projection.populateLocal(rs);
647551
} else if (isPeersV2) {
648-
if (peersV2Columns == null) {
649-
peersV2Columns = intersectWithNeeded(rs, PEERS_V2_COLUMNS_OF_INTEREST);
650-
}
552+
projection.populatePeersV2(rs);
651553
} else {
652-
if (peersColumns == null) {
653-
peersColumns = intersectWithNeeded(rs, PEERS_COLUMNS_OF_INTEREST);
654-
}
554+
projection.populatePeers(rs);
655555
}
656556
return row;
657557
} else {
@@ -724,6 +624,11 @@ boolean refreshNodeInfo(Host host) {
724624
} catch (ExecutionException e) {
725625
// If we're being shutdown during refresh, this can happen. That's fine so don't scare the
726626
// user.
627+
if (e.getCause() instanceof InvalidQueryException) {
628+
// A projected query referenced a column the server no longer exposes; reset caches so
629+
// the next connection re-discovers columns via SELECT *.
630+
projection.reset();
631+
}
727632
if (!isShutdown)
728633
logger.debug("[Control connection] Unexpected error while refreshing node info", e);
729634
signalError();
@@ -859,13 +764,9 @@ private static void updateLocationInfo(
859764
*/
860765
private ListenableFuture<ResultSet> selectPeersFuture(final Connection connection) {
861766
if (isPeersV2) {
862-
String peersV2Query =
863-
peersV2Columns == null
864-
? SELECT_PEERS_V2
865-
: buildProjectedQuery("system.peers_v2", peersV2Columns, null);
866767
DefaultResultSetFuture peersV2Future =
867768
new DefaultResultSetFuture(
868-
null, cluster.protocolVersion(), new Requests.Query(peersV2Query));
769+
null, cluster.protocolVersion(), new Requests.Query(projection.peersV2Query()));
869770
connection.write(peersV2Future);
870771
final SettableFuture<ResultSet> peersFuture = SettableFuture.create();
871772
// if peers v2 query fails, query peers table instead.
@@ -875,9 +776,7 @@ private ListenableFuture<ResultSet> selectPeersFuture(final Connection connectio
875776

876777
@Override
877778
public void onSuccess(ResultSet result) {
878-
if (peersV2Columns == null) {
879-
peersV2Columns = intersectWithNeeded(result, PEERS_V2_COLUMNS_OF_INTEREST);
880-
}
779+
projection.populatePeersV2(result);
881780
peersFuture.set(result);
882781
}
883782

@@ -891,9 +790,9 @@ public void onFailure(Throwable t) {
891790
|| (t instanceof ServerError
892791
&& t.getMessage().contains("Unknown keyspace/cf pair (system.peers_v2)"))) {
893792
isPeersV2 = false;
894-
// Also reset the peers cache so the first system.peers query issues SELECT *
895-
// to discover which columns that table exposes on this server.
896-
peersColumns = null;
793+
// Reset all caches: peersV2Columns is now stale, and peers cache should be cleared
794+
// so the first system.peers query re-discovers columns via SELECT *.
795+
projection.reset();
897796
MoreFutures.propagateFuture(peersFuture, selectPeersFuture(connection));
898797
} else {
899798
peersFuture.setException(t);
@@ -903,28 +802,27 @@ public void onFailure(Throwable t) {
903802
MoreExecutors.directExecutor());
904803
return peersFuture;
905804
} else {
906-
String peersQuery =
907-
peersColumns == null
908-
? SELECT_PEERS
909-
: buildProjectedQuery("system.peers", peersColumns, null);
910805
DefaultResultSetFuture rawFuture =
911806
new DefaultResultSetFuture(
912-
null, cluster.protocolVersion(), new Requests.Query(peersQuery));
807+
null, cluster.protocolVersion(), new Requests.Query(projection.peersQuery()));
913808
connection.write(rawFuture);
914809
final SettableFuture<ResultSet> peersFuture = SettableFuture.create();
915810
Futures.addCallback(
916811
rawFuture,
917812
new FutureCallback<ResultSet>() {
918813
@Override
919814
public void onSuccess(ResultSet result) {
920-
if (peersColumns == null) {
921-
peersColumns = intersectWithNeeded(result, PEERS_COLUMNS_OF_INTEREST);
922-
}
815+
projection.populatePeers(result);
923816
peersFuture.set(result);
924817
}
925818

926819
@Override
927820
public void onFailure(Throwable t) {
821+
if (t instanceof InvalidQueryException) {
822+
// The projected query referenced a column the server no longer exposes; reset
823+
// caches so the next query re-discovers columns via SELECT *.
824+
projection.reset();
825+
}
928826
peersFuture.setException(t);
929827
}
930828
},
@@ -933,35 +831,6 @@ public void onFailure(Throwable t) {
933831
}
934832
}
935833

936-
/**
937-
* Returns the intersection of the columns returned by the server (from {@code rs}) with the given
938-
* {@code needed} set, or {@code null} if the intersection is empty. The result is used to cache
939-
* projected column lists so subsequent queries fetch only what the driver actually reads. A
940-
* {@code null} return keeps the cache in the "uninitialized" sentinel state, ensuring the driver
941-
* continues issuing {@code SELECT *} rather than generating an invalid empty-column projection.
942-
*/
943-
@VisibleForTesting
944-
static Set<String> intersectWithNeeded(ResultSet rs, ImmutableSet<String> needed) {
945-
ImmutableSet.Builder<String> result = ImmutableSet.builder();
946-
for (ColumnDefinitions.Definition def : rs.getColumnDefinitions()) {
947-
if (needed.contains(def.getName())) {
948-
result.add(def.getName());
949-
}
950-
}
951-
ImmutableSet<String> built = result.build();
952-
return built.isEmpty() ? null : built;
953-
}
954-
955-
/**
956-
* Builds a {@code SELECT col1, col2, ... FROM table [WHERE whereClause]} query string from the
957-
* given projected column set. {@code whereClause} may be {@code null} for table-wide scans.
958-
*/
959-
@VisibleForTesting
960-
static String buildProjectedQuery(String table, Set<String> columns, String whereClause) {
961-
String query = "SELECT " + String.join(", ", columns) + " FROM " + table;
962-
return whereClause != null ? query + " WHERE " + whereClause : query;
963-
}
964-
965834
private void refreshNodeListAndTokenMap(
966835
final Connection connection,
967836
final Cluster.Manager cluster,
@@ -975,12 +844,9 @@ private void refreshNodeListAndTokenMap(
975844

976845
// Make sure we're up to date on nodes and tokens
977846

978-
String localQuery =
979-
localColumns == null
980-
? SELECT_LOCAL
981-
: buildProjectedQuery("system.local", localColumns, "key='local'");
982847
DefaultResultSetFuture localFuture =
983-
new DefaultResultSetFuture(null, cluster.protocolVersion(), new Requests.Query(localQuery));
848+
new DefaultResultSetFuture(
849+
null, cluster.protocolVersion(), new Requests.Query(projection.localQuery()));
984850
ListenableFuture<ResultSet> peersFuture = selectPeersFuture(connection);
985851
connection.write(localFuture);
986852

@@ -990,9 +856,7 @@ private void refreshNodeListAndTokenMap(
990856

991857
// Update cluster name, DC and rack for the one node we are connected to
992858
ResultSet localRs = localFuture.get();
993-
if (localColumns == null) {
994-
localColumns = intersectWithNeeded(localRs, LOCAL_COLUMNS_OF_INTEREST);
995-
}
859+
projection.populateLocal(localRs);
996860
Row localRow = localRs.one();
997861
if (localRow == null) {
998862
throw new IllegalStateException(

0 commit comments

Comments
 (0)