forked from apache/cassandra-java-driver
-
Notifications
You must be signed in to change notification settings - Fork 41
feat: optimize system table queries with column projection (DRIVER-368) #862
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
nikagra
wants to merge
2
commits into
scylladb:scylla-3.x
Choose a base branch
from
nikagra:driver-368-optimize-system-local-query
base: scylla-3.x
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -34,6 +34,7 @@ | |
| import com.datastax.driver.core.utils.MoreFutures; | ||
| import com.datastax.driver.core.utils.MoreObjects; | ||
| import com.google.common.annotations.VisibleForTesting; | ||
| import com.google.common.collect.ImmutableSet; | ||
| import com.google.common.collect.Iterators; | ||
| import com.google.common.util.concurrent.FutureCallback; | ||
| import com.google.common.util.concurrent.Futures; | ||
|
|
@@ -86,6 +87,91 @@ class ControlConnection implements Connection.Owner { | |
| private static final String SELECT_SCHEMA_LOCAL = | ||
| "SELECT schema_version, host_id FROM system.local WHERE key='local'"; | ||
|
|
||
| // IMPORTANT: Every column read from system.local rows — in updateInfo(), | ||
| // refreshNodeListAndTokenMap(), isValidPeer(), and DefaultEndPointFactory — MUST be listed here. | ||
| // If a new column read is added anywhere that consumes a system table row, add it to the | ||
| // appropriate set below, otherwise it will be silently excluded from projected queries. | ||
| @VisibleForTesting | ||
| static final ImmutableSet<String> LOCAL_COLUMNS_OF_INTEREST = | ||
| ImmutableSet.of( | ||
| "cluster_name", | ||
| "partitioner", | ||
| "data_center", | ||
| "rack", | ||
| "release_version", | ||
| "native_address", | ||
| "native_port", | ||
| "native_transport_address", | ||
| "native_transport_port", | ||
| "native_transport_port_ssl", | ||
| "rpc_address", | ||
| "broadcast_address", | ||
| "broadcast_port", | ||
| "listen_address", | ||
| "listen_port", | ||
| "tokens", | ||
| "host_id", | ||
| "schema_version", | ||
| "workload", | ||
| "graph", | ||
| "dse_version"); | ||
|
|
||
| // IMPORTANT: see LOCAL_COLUMNS_OF_INTEREST note above. | ||
| // Includes all columns consumed by updateInfo(), refreshNodeListAndTokenMap(), | ||
| // isValidPeer(), and DefaultEndPointFactory.create() from system.peers rows. | ||
| // Columns that are absent from the actual server schema are silently excluded by | ||
| // intersectWithNeeded(), so listing extra columns here is safe. | ||
| @VisibleForTesting | ||
| static final ImmutableSet<String> PEERS_COLUMNS_OF_INTEREST = | ||
| ImmutableSet.of( | ||
| "peer", | ||
| "peer_port", // peers_v2 column; harmless to list here — absent on peers, excluded safely | ||
| "rpc_address", | ||
| "data_center", | ||
| "rack", | ||
| "release_version", | ||
| "tokens", | ||
| "listen_address", | ||
| "listen_port", | ||
| "host_id", | ||
| "schema_version", | ||
| "native_address", // may appear on some server variants; guarded by contains() in code | ||
| "native_port", // same | ||
| "native_transport_address", | ||
| "native_transport_port", | ||
| "native_transport_port_ssl", | ||
| "workload", | ||
| "graph", | ||
| "dse_version"); | ||
|
|
||
| // IMPORTANT: see LOCAL_COLUMNS_OF_INTEREST note above. | ||
| // Includes all columns consumed by updateInfo(), refreshNodeListAndTokenMap(), | ||
| // isValidPeer(), and DefaultEndPointFactory.create() from system.peers_v2 rows. | ||
| // Columns that are absent from the actual server schema are silently excluded by | ||
| // intersectWithNeeded(), so listing extra columns here is safe. | ||
| @VisibleForTesting | ||
| static final ImmutableSet<String> PEERS_V2_COLUMNS_OF_INTEREST = | ||
| ImmutableSet.of( | ||
| "peer", | ||
| "peer_port", | ||
| "native_address", | ||
| "native_port", | ||
| "data_center", | ||
| "rack", | ||
| "release_version", | ||
| "tokens", | ||
| "host_id", | ||
| "schema_version", | ||
| "workload", | ||
| "graph", | ||
| "dse_version", | ||
| "listen_address", | ||
| "listen_port", | ||
| "rpc_address", // legacy; guarded by contains() in code — harmless if absent | ||
| "native_transport_address", // same | ||
| "native_transport_port", // same | ||
| "native_transport_port_ssl"); // same | ||
|
|
||
| private static final VersionNumber _3_11 = VersionNumber.parse("3.11.0"); | ||
|
|
||
| @VisibleForTesting | ||
|
|
@@ -102,10 +188,30 @@ class ControlConnection implements Connection.Owner { | |
| // from here on out. | ||
| private volatile boolean isPeersV2 = true; | ||
|
|
||
| // Column projection caches. null = uninitialized: the first query to each system table issues | ||
| // SELECT * to discover which columns the server exposes, then subsequent queries project only | ||
| // the intersection of those columns with the corresponding *_COLUMNS_OF_INTEREST set. | ||
| // Reset to null on every new connection so that the new server's schema is re-discovered. | ||
| private volatile Set<String> localColumns = null; | ||
| private volatile Set<String> peersColumns = null; | ||
| private volatile Set<String> peersV2Columns = null; | ||
|
|
||
| public ControlConnection(Cluster.Manager manager) { | ||
| this.cluster = manager; | ||
| } | ||
|
|
||
| /** | ||
| * Resets the projected-column caches so that the next query to each system table sends {@code | ||
| * SELECT *} and re-discovers available columns. Intended for use in tests that clear Scassandra | ||
| * primes between driver operations. | ||
| */ | ||
| @VisibleForTesting | ||
| void resetColumnCaches() { | ||
| localColumns = null; | ||
| peersColumns = null; | ||
| peersV2Columns = null; | ||
| } | ||
|
|
||
| // Only for the initial connection. Does not schedule retries if it fails | ||
| void connect() throws UnsupportedProtocolVersionException { | ||
| if (isShutdown) return; | ||
|
|
@@ -234,6 +340,11 @@ private void signalError() { | |
| private void setNewConnection(Connection newConnection) { | ||
| Host.statesLogger.debug("[Control connection] established to {}", newConnection.endPoint); | ||
| newConnection.setOwner(this); | ||
| // Reset column caches so the new connection triggers a fresh SELECT * to discover which | ||
| // columns the server exposes before switching to projected queries. | ||
| localColumns = null; | ||
| peersColumns = null; | ||
| peersV2Columns = null; | ||
| Connection old = connectionRef.getAndSet(newConnection); | ||
| if (old != null && !old.isClosed()) old.closeAsync().force(); | ||
| } | ||
|
|
@@ -326,6 +437,14 @@ private Connection tryConnect(Host host, boolean isInitialConnection) | |
| ProtocolEvent.Type.SCHEMA_CHANGE); | ||
| connection.write(new Requests.Register(evs)); | ||
|
|
||
| // Reset column caches so refreshNodeListAndTokenMap() uses SELECT * to rediscover | ||
| // which columns this server exposes, rather than a projected query built for the | ||
| // previous connection's server. setNewConnection() also resets these, but it is | ||
| // called after tryConnect() returns — too late to protect the queries below. | ||
| localColumns = null; | ||
| peersColumns = null; | ||
| peersV2Columns = null; | ||
|
|
||
| // We need to refresh the node list first so we know about the cassandra version of | ||
| // the node we're connecting to. | ||
| // This will create the token map for the first time, but it will be incomplete | ||
|
|
@@ -491,26 +610,54 @@ private Row fetchNodeInfo(Host host, Connection c) | |
| if (isConnectedHost || host.getBroadcastSocketAddress() != null) { | ||
| String query; | ||
| if (isConnectedHost) { | ||
| query = SELECT_LOCAL; | ||
| query = | ||
| localColumns == null | ||
| ? SELECT_LOCAL | ||
| : buildProjectedQuery("system.local", localColumns, "key='local'"); | ||
| } else { | ||
| InetSocketAddress broadcastAddress = host.getBroadcastSocketAddress(); | ||
| query = | ||
| isPeersV2 | ||
| ? SELECT_PEERS_V2 | ||
| + " WHERE peer='" | ||
| + broadcastAddress.getAddress().getHostAddress() | ||
| + "' AND peer_port=" | ||
| + broadcastAddress.getPort() | ||
| : SELECT_PEERS | ||
| + " WHERE peer='" | ||
| + broadcastAddress.getAddress().getHostAddress() | ||
| + "'"; | ||
| // Always use SELECT * for single-row WHERE lookups. Projected queries are only used for | ||
| // full-table scans via selectPeersFuture(), where the cache is guaranteed to be warm and | ||
| // every node has the projected full-scan prime registered. For WHERE lookups the control | ||
| // connection may query a node that was never restarted (and therefore still carries only | ||
| // the original SELECT * prime from init time), so projecting here risks a cache miss. | ||
| if (isPeersV2) { | ||
| String whereClause = | ||
| "peer='" | ||
| + broadcastAddress.getAddress().getHostAddress() | ||
| + "' AND peer_port=" | ||
| + broadcastAddress.getPort(); | ||
| query = SELECT_PEERS_V2 + " WHERE " + whereClause; | ||
| } else { | ||
| String whereClause = "peer='" + broadcastAddress.getAddress().getHostAddress() + "'"; | ||
| query = SELECT_PEERS + " WHERE " + whereClause; | ||
| } | ||
| } | ||
| DefaultResultSetFuture future = | ||
| new DefaultResultSetFuture(null, cluster.protocolVersion(), new Requests.Query(query)); | ||
| c.write(future); | ||
| Row row = future.get().one(); | ||
| ResultSet rs = future.get(); | ||
| Row row = rs.one(); | ||
| // Populate the column cache on first successful WHERE lookup so that subsequent full-table | ||
| // scans via selectPeersFuture() can send projected queries. Only populate when a row is | ||
| // found: if the WHERE returned empty we are about to fall through to the full-scan path, | ||
| // which has its own cache-population logic; populating here on an empty result would cause | ||
| // selectPeersFuture() to send a projected query before it has verified the server supports | ||
| // the projected columns. | ||
| if (row != null) { | ||
| if (isConnectedHost) { | ||
| if (localColumns == null) { | ||
| localColumns = intersectWithNeeded(rs, LOCAL_COLUMNS_OF_INTEREST); | ||
| } | ||
| } else if (isPeersV2) { | ||
| if (peersV2Columns == null) { | ||
| peersV2Columns = intersectWithNeeded(rs, PEERS_V2_COLUMNS_OF_INTEREST); | ||
| } | ||
| } else { | ||
| if (peersColumns == null) { | ||
| peersColumns = intersectWithNeeded(rs, PEERS_COLUMNS_OF_INTEREST); | ||
| } | ||
| } | ||
| return row; | ||
| } else { | ||
| InetSocketAddress address = host.getBroadcastSocketAddress(); | ||
|
|
@@ -717,9 +864,13 @@ private static void updateLocationInfo( | |
| */ | ||
| private ListenableFuture<ResultSet> selectPeersFuture(final Connection connection) { | ||
| if (isPeersV2) { | ||
| String peersV2Query = | ||
| peersV2Columns == null | ||
| ? SELECT_PEERS_V2 | ||
| : buildProjectedQuery("system.peers_v2", peersV2Columns, null); | ||
| DefaultResultSetFuture peersV2Future = | ||
| new DefaultResultSetFuture( | ||
| null, cluster.protocolVersion(), new Requests.Query(SELECT_PEERS_V2)); | ||
| null, cluster.protocolVersion(), new Requests.Query(peersV2Query)); | ||
| connection.write(peersV2Future); | ||
| final SettableFuture<ResultSet> peersFuture = SettableFuture.create(); | ||
| // if peers v2 query fails, query peers table instead. | ||
|
|
@@ -729,6 +880,9 @@ private ListenableFuture<ResultSet> selectPeersFuture(final Connection connectio | |
|
|
||
| @Override | ||
| public void onSuccess(ResultSet result) { | ||
| if (peersV2Columns == null) { | ||
| peersV2Columns = intersectWithNeeded(result, PEERS_V2_COLUMNS_OF_INTEREST); | ||
| } | ||
| peersFuture.set(result); | ||
| } | ||
|
|
||
|
|
@@ -742,6 +896,9 @@ public void onFailure(Throwable t) { | |
| || (t instanceof ServerError | ||
| && t.getMessage().contains("Unknown keyspace/cf pair (system.peers_v2)"))) { | ||
| isPeersV2 = false; | ||
| // Also reset the peers cache so the first system.peers query issues SELECT * | ||
| // to discover which columns that table exposes on this server. | ||
| peersColumns = null; | ||
| MoreFutures.propagateFuture(peersFuture, selectPeersFuture(connection)); | ||
| } else { | ||
| peersFuture.setException(t); | ||
|
|
@@ -751,14 +908,73 @@ public void onFailure(Throwable t) { | |
| MoreExecutors.directExecutor()); | ||
| return peersFuture; | ||
| } else { | ||
| DefaultResultSetFuture peersFuture = | ||
| String peersQuery = | ||
| peersColumns == null | ||
| ? SELECT_PEERS | ||
| : buildProjectedQuery("system.peers", peersColumns, null); | ||
| DefaultResultSetFuture rawFuture = | ||
| new DefaultResultSetFuture( | ||
| null, cluster.protocolVersion(), new Requests.Query(SELECT_PEERS)); | ||
| connection.write(peersFuture); | ||
| null, cluster.protocolVersion(), new Requests.Query(peersQuery)); | ||
| connection.write(rawFuture); | ||
| final SettableFuture<ResultSet> peersFuture = SettableFuture.create(); | ||
| Futures.addCallback( | ||
| rawFuture, | ||
| new FutureCallback<ResultSet>() { | ||
| @Override | ||
| public void onSuccess(ResultSet result) { | ||
| if (peersColumns == null) { | ||
| peersColumns = intersectWithNeeded(result, PEERS_COLUMNS_OF_INTEREST); | ||
| } | ||
| peersFuture.set(result); | ||
| } | ||
|
|
||
| @Override | ||
| public void onFailure(Throwable t) { | ||
| peersFuture.setException(t); | ||
| } | ||
| }, | ||
| MoreExecutors.directExecutor()); | ||
| return peersFuture; | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Returns the intersection of the columns returned by the server (from {@code rs}) with the given | ||
| * {@code needed} set, or {@code null} if the intersection is empty. The result is used to cache | ||
| * projected column lists so subsequent queries fetch only what the driver actually reads. A | ||
| * {@code null} return keeps the cache in the "uninitialized" sentinel state, ensuring the driver | ||
| * continues issuing {@code SELECT *} rather than generating an invalid empty-column projection. | ||
| */ | ||
| @VisibleForTesting | ||
| static Set<String> intersectWithNeeded(ResultSet rs, ImmutableSet<String> needed) { | ||
| ImmutableSet.Builder<String> result = ImmutableSet.builder(); | ||
| for (ColumnDefinitions.Definition def : rs.getColumnDefinitions()) { | ||
| if (needed.contains(def.getName())) { | ||
| result.add(def.getName()); | ||
| } | ||
| } | ||
| ImmutableSet<String> built = result.build(); | ||
| return built.isEmpty() ? null : built; | ||
| } | ||
|
|
||
| /** | ||
| * Builds a {@code SELECT col1, col2, ... FROM table [WHERE whereClause]} query string from the | ||
| * given projected column set. {@code whereClause} may be {@code null} for table-wide scans. | ||
| */ | ||
| @VisibleForTesting | ||
| static String buildProjectedQuery(String table, Set<String> columns, String whereClause) { | ||
| StringBuilder sb = new StringBuilder("SELECT "); | ||
| boolean first = true; | ||
| for (String col : columns) { | ||
| if (!first) sb.append(", "); | ||
| sb.append(col); | ||
| first = false; | ||
| } | ||
| sb.append(" FROM ").append(table); | ||
| if (whereClause != null) sb.append(" WHERE ").append(whereClause); | ||
| return sb.toString(); | ||
| } | ||
|
nikagra marked this conversation as resolved.
|
||
|
|
||
| private void refreshNodeListAndTokenMap( | ||
| final Connection connection, | ||
| final Cluster.Manager cluster, | ||
|
|
@@ -772,9 +988,12 @@ private void refreshNodeListAndTokenMap( | |
|
|
||
| // Make sure we're up to date on nodes and tokens | ||
|
|
||
| String localQuery = | ||
| localColumns == null | ||
| ? SELECT_LOCAL | ||
| : buildProjectedQuery("system.local", localColumns, "key='local'"); | ||
| DefaultResultSetFuture localFuture = | ||
| new DefaultResultSetFuture( | ||
| null, cluster.protocolVersion(), new Requests.Query(SELECT_LOCAL)); | ||
| new DefaultResultSetFuture(null, cluster.protocolVersion(), new Requests.Query(localQuery)); | ||
| ListenableFuture<ResultSet> peersFuture = selectPeersFuture(connection); | ||
| connection.write(localFuture); | ||
|
|
||
|
|
@@ -783,7 +1002,11 @@ private void refreshNodeListAndTokenMap( | |
| Map<Host, Set<Token>> tokenMap = new HashMap<Host, Set<Token>>(); | ||
|
|
||
| // Update cluster name, DC and rack for the one node we are connected to | ||
| Row localRow = localFuture.get().one(); | ||
| ResultSet localRs = localFuture.get(); | ||
| if (localColumns == null) { | ||
| localColumns = intersectWithNeeded(localRs, LOCAL_COLUMNS_OF_INTEREST); | ||
| } | ||
|
Comment on lines
+1006
to
+1008
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why do you need this injection ? isn't |
||
| Row localRow = localRs.one(); | ||
| if (localRow == null) { | ||
| throw new IllegalStateException( | ||
| String.format( | ||
|
|
||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you sure you need this guard ? What if it reconnections to the host that have less columns than before ?