Skip to content

Commit 813c9d9

Browse files
nikagradkropachev
authored andcommitted
[scylladb#597] Updating extension negotiation logic. Adding ProtocolFeatures to FrameEncoder and FrameDecoder constructors. Updating integration tests.
1 parent da1a4ca commit 813c9d9

11 files changed

Lines changed: 81 additions & 43 deletions

File tree

core/src/main/java/com/datastax/oss/driver/internal/core/channel/ChannelFactory.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import com.datastax.oss.driver.shaded.guava.common.annotations.VisibleForTesting;
4848
import com.datastax.oss.driver.shaded.guava.common.base.Preconditions;
4949
import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableMap;
50+
import com.datastax.oss.protocol.internal.ProtocolFeatures;
5051
import io.netty.bootstrap.Bootstrap;
5152
import io.netty.channel.Channel;
5253
import io.netty.channel.ChannelFuture;
@@ -256,7 +257,7 @@ private void connect(
256257
cf -> {
257258
if (connectFuture.isSuccess()) {
258259
Channel channel = connectFuture.channel();
259-
DriverChannel driverChannel =
260+
DriverChannel driverChannel = // driver channel init
260261
new DriverChannel(endPoint, channel, context.getWriteCoalescer(), currentVersion);
261262
// If this is the first successful connection, remember the protocol version and
262263
// cluster name for future connections.
@@ -330,7 +331,7 @@ ChannelInitializer<Channel> initializer(
330331
CompletableFuture<DriverChannel> resultFuture) {
331332
return new ChannelFactoryInitializer(
332333
endPoint, protocolVersion, options, nodeMetricUpdater, resultFuture);
333-
};
334+
}
334335

335336
class ChannelFactoryInitializer extends ChannelInitializer<Channel> {
336337

@@ -422,10 +423,10 @@ protected void initChannel(Channel channel) {
422423
pipeline
423424
.addLast(
424425
FRAME_TO_BYTES_ENCODER_NAME,
425-
new FrameEncoder(context.getFrameCodec(), maxFrameLength))
426+
new FrameEncoder(context.getFrameCodec(), new ProtocolFeatures(), maxFrameLength))
426427
.addLast(
427428
BYTES_TO_FRAME_DECODER_NAME,
428-
new FrameDecoder(context.getFrameCodec(), maxFrameLength))
429+
new FrameDecoder(context.getFrameCodec(), new ProtocolFeatures(), maxFrameLength))
429430
// Note: HeartbeatHandler is inserted here once init completes
430431
.addLast(INFLIGHT_HANDLER_NAME, inFlightHandler)
431432
.addLast(INIT_HANDLER_NAME, initHandler);

core/src/main/java/com/datastax/oss/driver/internal/core/channel/DriverChannel.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import com.datastax.oss.driver.internal.core.session.DefaultSession;
3939
import com.datastax.oss.driver.internal.core.util.concurrent.UncaughtExceptions;
4040
import com.datastax.oss.protocol.internal.Message;
41+
import com.datastax.oss.protocol.internal.ProtocolFeatures;
4142
import io.netty.channel.Channel;
4243
import io.netty.channel.ChannelConfig;
4344
import io.netty.channel.ChannelFuture;

core/src/main/java/com/datastax/oss/driver/internal/core/protocol/FrameDecoder.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import com.datastax.oss.protocol.internal.Frame;
2323
import com.datastax.oss.protocol.internal.FrameCodec;
2424
import com.datastax.oss.protocol.internal.ProtocolConstants;
25+
import com.datastax.oss.protocol.internal.ProtocolFeatures;
2526
import com.datastax.oss.protocol.internal.response.Error;
2627
import io.netty.buffer.ByteBuf;
2728
import io.netty.channel.ChannelHandlerContext;
@@ -41,11 +42,16 @@ public class FrameDecoder extends LengthFieldBasedFrameDecoder {
4142
private static final int LENGTH_FIELD_LENGTH = 4;
4243

4344
private final FrameCodec<ByteBuf> frameCodec;
45+
private final ProtocolFeatures protocolFeatures;
4446
private boolean isFirstResponse;
4547

46-
public FrameDecoder(FrameCodec<ByteBuf> frameCodec, int maxFrameLengthInBytes) {
48+
public FrameDecoder(
49+
FrameCodec<ByteBuf> frameCodec,
50+
ProtocolFeatures protocolFeatures,
51+
int maxFrameLengthInBytes) {
4752
super(maxFrameLengthInBytes, LENGTH_FIELD_OFFSET, LENGTH_FIELD_LENGTH, 0, 0, true);
4853
this.frameCodec = frameCodec;
54+
this.protocolFeatures = protocolFeatures;
4955
}
5056

5157
@Override
@@ -87,7 +93,7 @@ protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception
8793
ByteBuf buffer = (ByteBuf) super.decode(ctx, in);
8894
return (buffer == null)
8995
? null // did not receive whole frame yet, keep reading
90-
: frameCodec.decode(buffer);
96+
: frameCodec.decode(buffer, protocolFeatures);
9197
} catch (Exception e) {
9298
// If decoding failed, try to read at least the stream id, so that the error can be
9399
// propagated to the client request matching that id (otherwise we have to fail all

core/src/main/java/com/datastax/oss/driver/internal/core/protocol/FrameEncoder.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import com.datastax.oss.driver.api.core.connection.FrameTooLongException;
2121
import com.datastax.oss.protocol.internal.Frame;
2222
import com.datastax.oss.protocol.internal.FrameCodec;
23+
import com.datastax.oss.protocol.internal.ProtocolFeatures;
2324
import io.netty.buffer.ByteBuf;
2425
import io.netty.channel.ChannelHandler;
2526
import io.netty.channel.ChannelHandlerContext;
@@ -32,17 +33,20 @@
3233
public class FrameEncoder extends MessageToMessageEncoder<Frame> {
3334

3435
private final FrameCodec<ByteBuf> frameCodec;
36+
private final ProtocolFeatures protocolFeatures;
3537
private final int maxFrameLength;
3638

37-
public FrameEncoder(FrameCodec<ByteBuf> frameCodec, int maxFrameLength) {
39+
public FrameEncoder(
40+
FrameCodec<ByteBuf> frameCodec, ProtocolFeatures protocolFeatures, int maxFrameLength) {
3841
super(Frame.class);
3942
this.frameCodec = frameCodec;
43+
this.protocolFeatures = protocolFeatures;
4044
this.maxFrameLength = maxFrameLength;
4145
}
4246

4347
@Override
4448
protected void encode(ChannelHandlerContext ctx, Frame frame, List<Object> out) throws Exception {
45-
ByteBuf buffer = frameCodec.encode(frame);
49+
ByteBuf buffer = frameCodec.encode(frame, protocolFeatures);
4650
int actualLength = buffer.readableBytes();
4751
if (actualLength > maxFrameLength) {
4852
throw new FrameTooLongException(

core/src/main/java/com/datastax/oss/driver/internal/core/protocol/MetadataIdInfo.java

Lines changed: 6 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -7,25 +7,16 @@ public class MetadataIdInfo {
77
private static final String SCYLLA_USE_METADATA_ID_STARTUP_OPTION_KEY = "SCYLLA_USE_METADATA_ID";
88
private static final String SCYLLA_USE_METADATA_ID_STARTUP_OPTION_VALUE = "";
99

10-
private final boolean enabled;
10+
private MetadataIdInfo() {}
1111

12-
private MetadataIdInfo(boolean enabled) {
13-
this.enabled = enabled;
14-
}
15-
16-
public boolean isEnabled() {
17-
return enabled;
18-
}
19-
20-
public static MetadataIdInfo parseMetadataId(Map<String, List<String>> supported) {
12+
public static boolean parseMetadataId(Map<String, List<String>> supported) {
2113
if (!supported.containsKey(SCYLLA_USE_METADATA_ID_STARTUP_OPTION_KEY)) {
22-
return new MetadataIdInfo(false);
14+
return false;
2315
}
2416
List<String> values = supported.get(SCYLLA_USE_METADATA_ID_STARTUP_OPTION_KEY);
25-
return new MetadataIdInfo(
26-
values != null
27-
&& values.size() == 1
28-
&& values.get(0).equals(SCYLLA_USE_METADATA_ID_STARTUP_OPTION_VALUE));
17+
return values != null
18+
&& values.size() == 1
19+
&& values.get(0).equals(SCYLLA_USE_METADATA_ID_STARTUP_OPTION_VALUE);
2920
}
3021

3122
public static void addOption(Map<String, String> options) {

core/src/main/java/com/datastax/oss/driver/internal/core/protocol/SegmentToFrameDecoder.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import com.datastax.oss.protocol.internal.Frame;
2121
import com.datastax.oss.protocol.internal.FrameCodec;
22+
import com.datastax.oss.protocol.internal.ProtocolFeatures;
2223
import com.datastax.oss.protocol.internal.Segment;
2324
import edu.umd.cs.findbugs.annotations.NonNull;
2425
import io.netty.buffer.ByteBuf;
@@ -73,7 +74,7 @@ private void decodeSelfContained(Segment<ByteBuf> segment, List<Object> out) {
7374
int frameCount = 0;
7475
try {
7576
do {
76-
Frame frame = frameCodec.decode(payload);
77+
Frame frame = frameCodec.decode(payload, new ProtocolFeatures());
7778
LOG.trace(
7879
"[{}] Decoded response frame {} from self-contained segment",
7980
logPrefix,
@@ -110,7 +111,7 @@ private void decodeSlice(Segment<ByteBuf> segment, ByteBufAllocator allocator, L
110111
encodedFrame.addComponents(true, accumulatedSlices);
111112
Frame frame;
112113
try {
113-
frame = frameCodec.decode(encodedFrame);
114+
frame = frameCodec.decode(encodedFrame, new ProtocolFeatures());
114115
} finally {
115116
encodedFrame.release();
116117
// Reset our state

core/src/test/java/com/datastax/oss/driver/internal/core/channel/ProtocolInitHandlerTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ public class ProtocolInitHandlerTest extends ChannelHandlerTestBase {
8484
@Mock private DriverExecutionProfile defaultProfile;
8585
@Mock private Appender<ILoggingEvent> appender;
8686

87-
private ProtocolVersionRegistry protocolVersionRegistry =
87+
private final ProtocolVersionRegistry protocolVersionRegistry =
8888
new DefaultProtocolVersionRegistry("test");
8989
private HeartbeatHandler heartbeatHandler;
9090

@@ -365,7 +365,7 @@ public void should_invoke_auth_provider_when_server_does_not_send_challenge() {
365365
}
366366

367367
@Test
368-
public void should_fail_to_initialize_if_server_sends_auth_error() throws Throwable {
368+
public void should_fail_to_initialize_if_server_sends_auth_error() {
369369
channel
370370
.pipeline()
371371
.addLast(
@@ -443,7 +443,7 @@ public void should_check_cluster_name_if_provided() {
443443
}
444444

445445
@Test
446-
public void should_fail_to_initialize_if_cluster_name_does_not_match() throws Throwable {
446+
public void should_fail_to_initialize_if_cluster_name_does_not_match() {
447447
channel
448448
.pipeline()
449449
.addLast(

core/src/test/java/com/datastax/oss/driver/internal/core/protocol/FrameDecoderTest.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import com.datastax.oss.protocol.internal.Compressor;
2727
import com.datastax.oss.protocol.internal.Frame;
2828
import com.datastax.oss.protocol.internal.FrameCodec;
29+
import com.datastax.oss.protocol.internal.ProtocolFeatures;
2930
import com.datastax.oss.protocol.internal.response.AuthSuccess;
3031
import io.netty.buffer.ByteBuf;
3132
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
@@ -63,7 +64,7 @@ public void setup() {
6364
@Test
6465
public void should_decode_valid_payload() {
6566
// Given
66-
FrameDecoder decoder = new FrameDecoder(frameCodec, 1024);
67+
FrameDecoder decoder = new FrameDecoder(frameCodec, new ProtocolFeatures(), 1024);
6768
channel.pipeline().addLast(decoder);
6869

6970
// When
@@ -83,7 +84,8 @@ public void should_decode_valid_payload() {
8384
@Test
8485
public void should_fail_to_decode_if_payload_is_valid_but_too_long() {
8586
// Given
86-
FrameDecoder decoder = new FrameDecoder(frameCodec, VALID_PAYLOAD.readableBytes() - 1);
87+
FrameDecoder decoder =
88+
new FrameDecoder(frameCodec, new ProtocolFeatures(), VALID_PAYLOAD.readableBytes() - 1);
8789
channel.pipeline().addLast(decoder);
8890

8991
// When
@@ -102,7 +104,7 @@ public void should_fail_to_decode_if_payload_is_valid_but_too_long() {
102104
@Test
103105
public void should_fail_to_decode_if_payload_cannot_be_decoded() {
104106
// Given
105-
FrameDecoder decoder = new FrameDecoder(frameCodec, 1024);
107+
FrameDecoder decoder = new FrameDecoder(frameCodec, new ProtocolFeatures(), 1024);
106108
channel.pipeline().addLast(decoder);
107109

108110
// When

core/src/test/java/com/datastax/oss/driver/internal/core/protocol/SegmentToFrameDecoderTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import com.datastax.oss.protocol.internal.FrameCodec;
2626
import com.datastax.oss.protocol.internal.Message;
2727
import com.datastax.oss.protocol.internal.ProtocolConstants;
28+
import com.datastax.oss.protocol.internal.ProtocolFeatures;
2829
import com.datastax.oss.protocol.internal.ProtocolV5ClientCodecs;
2930
import com.datastax.oss.protocol.internal.ProtocolV5ServerCodecs;
3031
import com.datastax.oss.protocol.internal.Segment;
@@ -93,6 +94,6 @@ private static ByteBuf encodeFrame(Message message) {
9394
Collections.emptyMap(),
9495
Collections.emptyList(),
9596
message);
96-
return FRAME_CODEC.encode(frame);
97+
return FRAME_CODEC.encode(frame, new ProtocolFeatures());
9798
}
9899
}

integration-tests/src/test/java/com/datastax/oss/driver/core/cql/PreparedStatementIT.java

Lines changed: 35 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -79,9 +79,9 @@
7979
@Category(ParallelizableTests.class)
8080
public class PreparedStatementIT {
8181

82-
private CcmRule ccmRule = CcmRule.getInstance();
82+
private final CcmRule ccmRule = CcmRule.getInstance();
8383

84-
private SessionRule<CqlSession> sessionRule =
84+
private final SessionRule<CqlSession> sessionRule =
8585
SessionRule.builder(ccmRule)
8686
.withConfigLoader(
8787
SessionUtils.configLoaderBuilder()
@@ -159,8 +159,8 @@ public void should_have_non_empty_variable_definitions_for_select_query_with_bou
159159

160160
@Test
161161
@BackendRequirement(type = BackendType.CASSANDRA, minInclusive = "4.0")
162-
@BackendRequirement(type = BackendType.SCYLLA)
163-
public void should_update_metadata_when_schema_changed_across_executions() {
162+
@BackendRequirement(type = BackendType.SCYLLA, maxExclusive = "2025.3")
163+
public void should_update_metadata_when_schema_changed_across_executions_with_no_metadata_id_feature() {
164164
// Given
165165
CqlSession session = sessionRule.session();
166166
PreparedStatement ps = session.prepare("SELECT * FROM prepared_statement_test WHERE a = ?");
@@ -205,6 +205,37 @@ public void should_update_metadata_when_schema_changed_across_executions() {
205205
}
206206
}
207207

208+
@Test
209+
@BackendRequirement(type = BackendType.SCYLLA, minInclusive = "2025.3", description = "SCYLLA_USE_METADATA_ID feature is added in Scylla 2025.3")
210+
public void should_update_metadata_when_schema_changed_across_executions_with_metadata_id_feature_support() {
211+
// Given
212+
CqlSession session = sessionRule.session();
213+
PreparedStatement ps = session.prepare("SELECT * FROM prepared_statement_test WHERE a = ?");
214+
ByteBuffer idBefore = ps.getResultMetadataId();
215+
assertThat(idBefore).isNotNull();
216+
217+
// When
218+
session.execute(
219+
SimpleStatement.builder("ALTER TABLE prepared_statement_test ADD d int")
220+
.setExecutionProfile(sessionRule.slowProfile())
221+
.build());
222+
BoundStatement bs = ps.bind(1);
223+
ResultSet rows = session.execute(bs);
224+
225+
// Then
226+
ByteBuffer idAfter = ps.getResultMetadataId();
227+
assertThat(idAfter).isNotNull();
228+
assertThat(Bytes.toHexString(idAfter)).isNotEqualTo(Bytes.toHexString(idBefore));
229+
for (ColumnDefinitions columnDefinitions :
230+
ImmutableList.of(
231+
ps.getResultSetDefinitions(),
232+
bs.getPreparedStatement().getResultSetDefinitions(),
233+
rows.getColumnDefinitions())) {
234+
assertThat(columnDefinitions).hasSize(4);
235+
assertThat(columnDefinitions.get("d").getType()).isEqualTo(DataTypes.INT);
236+
}
237+
}
238+
208239
@Test
209240
@BackendRequirement(type = BackendType.CASSANDRA, minInclusive = "4.0")
210241
@BackendRequirement(type = BackendType.SCYLLA)

0 commit comments

Comments
 (0)