Skip to content

Commit ee0dcc3

Browse files
committed
Merge branch 'cassandra-6.0' into trunk
2 parents 5ddc434 + 03422ed commit ee0dcc3

5 files changed

Lines changed: 266 additions & 1 deletion

File tree

CHANGES.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
* Allow nodetool garbagecollect to take a user defined list of SSTables (CASSANDRA-16767)
55
* Add a guardrail for misprepared statements (CASSANDRA-21139)
66
Merged from 6.0:
7+
* Relax assertion on partitioner instances in SinglePartitionReadCommand (CASSANDRA-21251)
78
* Report cancelled read command execution to coordinator as a RequestFailure.TIMEOUT (CASSANDRA-21468)
89
* Add policy for selecting CMS host when submitting commit request (CASSANDRA-21456)
910
* Fix TCM log catchup from peer with snapshots and gaps in the log sequence (CASSANDRA-21455)

src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@
7070
import org.apache.cassandra.db.virtual.VirtualKeyspaceRegistry;
7171
import org.apache.cassandra.db.virtual.VirtualTable;
7272
import org.apache.cassandra.dht.Bounds;
73+
import org.apache.cassandra.dht.IPartitioner;
7374
import org.apache.cassandra.exceptions.RequestExecutionException;
7475
import org.apache.cassandra.index.Index;
7576
import org.apache.cassandra.io.sstable.SSTableReadsListener;
@@ -121,7 +122,8 @@ protected SinglePartitionReadCommand(Epoch serializedAtEpoch,
121122
DataRange dataRange)
122123
{
123124
super(serializedAtEpoch, Kind.SINGLE_PARTITION, isDigest, digestVersion, acceptsTransient, potentialTxnConflicts, metadata, nowInSec, columnFilter, rowFilter, limits, indexQueryPlan, trackWarnings, dataRange);
124-
assert partitionKey.getPartitioner() == metadata.partitioner;
125+
assert IPartitioner.equivalent(partitionKey.getPartitioner(), metadata.partitioner) : String.format("Mismatching partitioners for key (%s) and table metadata (%s)",
126+
partitionKey.getPartitioner(), metadata.partitioner);
125127
this.partitionKey = partitionKey;
126128
this.clusteringIndexFilter = clusteringIndexFilter;
127129
}

src/java/org/apache/cassandra/dht/IPartitioner.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,15 @@ static IPartitioner global()
4141
return DatabaseDescriptor.getPartitioner();
4242
}
4343

44+
static boolean equivalent(IPartitioner p1, IPartitioner p2)
45+
{
46+
if (p1 == p2) return true;
47+
if (p1.getClass() != p2.getClass()) return false;
48+
if (p1.getClass() == LocalPartitioner.class)
49+
return ((LocalPartitioner)p1).comparator == ((LocalPartitioner)p2).comparator;
50+
return true;
51+
}
52+
4453
/**
4554
* @return a new instance of a reusable key
4655
*/
Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.cassandra.distributed.test.tcm;
20+
21+
import java.io.IOException;
22+
import java.util.concurrent.Callable;
23+
import java.util.concurrent.TimeUnit;
24+
import java.util.concurrent.atomic.AtomicBoolean;
25+
import java.util.concurrent.atomic.AtomicInteger;
26+
27+
import com.google.common.util.concurrent.Uninterruptibles;
28+
29+
import net.bytebuddy.ByteBuddy;
30+
import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
31+
import net.bytebuddy.implementation.MethodDelegation;
32+
import net.bytebuddy.implementation.bind.annotation.SuperCall;
33+
34+
import org.awaitility.Awaitility;
35+
import org.junit.Test;
36+
37+
import org.apache.cassandra.db.DecoratedKey;
38+
import org.apache.cassandra.db.ReadCommand;
39+
import org.apache.cassandra.db.ReadExecutionController;
40+
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
41+
import org.apache.cassandra.distributed.Cluster;
42+
import org.apache.cassandra.distributed.api.ConsistencyLevel;
43+
import org.apache.cassandra.distributed.shared.ClusterUtils;
44+
import org.apache.cassandra.distributed.test.TestBaseImpl;
45+
import org.apache.cassandra.index.internal.CassandraIndexSearcher;
46+
import org.apache.cassandra.net.Verb;
47+
import org.apache.cassandra.tcm.Epoch;
48+
49+
import static net.bytebuddy.matcher.ElementMatchers.named;
50+
import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
51+
import static org.apache.cassandra.distributed.api.Feature.NETWORK;
52+
import static org.junit.Assert.assertEquals;
53+
54+
public class IndexTablePartitionerAfterForceSnapshotTest extends TestBaseImpl
55+
{
56+
@Test
57+
public void indexQueryAfterSnapshotTest() throws IOException, InterruptedException
58+
{
59+
try (Cluster cluster = Cluster.build(3)
60+
.withInstanceInitializer(BBInstaller::install)
61+
.withConfig(config -> config.with(GOSSIP, NETWORK))
62+
.start())
63+
{
64+
init(cluster);
65+
cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
66+
cluster.schemaChange(withKeyspace("create index iii2 on %s.tbl (v)"));
67+
for (int i = 0; i < 10000; i++)
68+
cluster.coordinator(1).execute(withKeyspace("insert into %s.tbl (pk, ck, v) values (?, ?, ?)"), ConsistencyLevel.ALL, 1, i, i % 5);
69+
70+
// drop metadata change replication messages from node 1 to nodes 2 & 3
71+
cluster.filters().verbs(Verb.TCM_REPLICATION.id,
72+
Verb.TCM_FETCH_CMS_LOG_RSP.id,
73+
Verb.TCM_FETCH_PEER_LOG_RSP.id,
74+
Verb.TCM_CURRENT_EPOCH_REQ.id)
75+
.from(1)
76+
.drop()
77+
.on();
78+
79+
// node1 makes some metadata changes interspersed with snapshots. When the message filters are dropped, this
80+
// will cause nodes 2 & 3 to try and catchup. We want multiple snapshots to be taken to ensure that the
81+
// catchup responses contain one, rather than just a list of entries.
82+
cluster.get(1).nodetoolResult("cms", "snapshot").asserts().success();
83+
cluster.get(1).nodetoolResult("cms", "snapshot").asserts().success();
84+
final Epoch epoch = ClusterUtils.getCurrentEpoch(cluster.get(1));
85+
// start executing
86+
AtomicBoolean stop = new AtomicBoolean();
87+
AtomicInteger queryFailures = new AtomicInteger();
88+
Thread t = new Thread(() -> {
89+
while (!stop.get())
90+
{
91+
try
92+
{
93+
cluster.coordinator(1).execute(withKeyspace("select * from %s.tbl where pk=1 and v=4"), ConsistencyLevel.ALL);
94+
}
95+
catch (Throwable e)
96+
{
97+
queryFailures.incrementAndGet();
98+
}
99+
}
100+
});
101+
t.start();
102+
TimeUnit.SECONDS.sleep(1);
103+
// drop the filters and wait for nodes 2 & 3 to catch up
104+
cluster.filters().reset();
105+
Awaitility.waitAtMost(30, TimeUnit.SECONDS)
106+
.pollInterval(1, TimeUnit.SECONDS)
107+
.until(() -> ClusterUtils.getCurrentEpoch(cluster.get(2)).isEqualOrAfter(epoch) &&
108+
ClusterUtils.getCurrentEpoch(cluster.get(3)).isEqualOrAfter(epoch));
109+
110+
// give it another second of querying
111+
TimeUnit.SECONDS.sleep(1);
112+
stop.set(true);
113+
t.join();
114+
assertEquals(0, queryFailures.get());
115+
}
116+
}
117+
118+
public static class BBInstaller
119+
{
120+
public static void install(ClassLoader classLoader, int inst)
121+
{
122+
if (inst == 1)
123+
return;
124+
new ByteBuddy().rebase(CassandraIndexSearcher.class)
125+
.method(named("queryIndex"))
126+
.intercept(MethodDelegation.to(BBInterceptor.class))
127+
.make()
128+
.load(classLoader, ClassLoadingStrategy.Default.INJECTION);
129+
}
130+
}
131+
132+
public static class BBInterceptor
133+
{
134+
public static UnfilteredRowIterator queryIndex(DecoratedKey indexKey, ReadCommand command, ReadExecutionController executionController, @SuperCall Callable<UnfilteredRowIterator> zuper) throws Exception
135+
{
136+
// this makes it more likely that we decorate the key with one partitioner and execute the SinglePartitionReadCommand with another
137+
Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
138+
return zuper.call();
139+
}
140+
}
141+
}
Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.cassandra.distributed.upgrade;
20+
21+
import java.util.concurrent.Callable;
22+
import java.util.concurrent.TimeUnit;
23+
import java.util.concurrent.atomic.AtomicBoolean;
24+
import java.util.concurrent.atomic.AtomicInteger;
25+
import java.util.function.Consumer;
26+
27+
import com.google.common.util.concurrent.Uninterruptibles;
28+
29+
import net.bytebuddy.ByteBuddy;
30+
import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
31+
import net.bytebuddy.implementation.MethodDelegation;
32+
import net.bytebuddy.implementation.bind.annotation.SuperCall;
33+
34+
import org.junit.Test;
35+
36+
import org.apache.cassandra.db.DecoratedKey;
37+
import org.apache.cassandra.db.ReadCommand;
38+
import org.apache.cassandra.db.ReadExecutionController;
39+
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
40+
import org.apache.cassandra.distributed.UpgradeableCluster;
41+
import org.apache.cassandra.distributed.api.ConsistencyLevel;
42+
import org.apache.cassandra.distributed.api.Feature;
43+
import org.apache.cassandra.index.internal.CassandraIndexSearcher;
44+
45+
import static net.bytebuddy.matcher.ElementMatchers.named;
46+
import static org.junit.Assert.assertEquals;
47+
48+
public class ClusterMetadata2iInitializeTest extends UpgradeTestBase
49+
{
50+
@Test
51+
public void initializeCMSWithConcurrentIndexReadsTest() throws Throwable
52+
{
53+
Consumer<UpgradeableCluster.Builder > builderUpdater = builder -> builder.withInstanceInitializer(BBInstaller::install);
54+
new TestCase()
55+
.nodes(3)
56+
.withConfig((cfg) -> cfg.with(Feature.GOSSIP))
57+
.withBuilder(builderUpdater)
58+
.upgradesToCurrentFrom(v41)
59+
.setup((cluster) -> {
60+
cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
61+
cluster.schemaChange(withKeyspace("create index iii2 on %s.tbl (v)"));
62+
for (int i = 0; i < 10000; i++)
63+
cluster.coordinator(1).execute(withKeyspace("insert into %s.tbl (pk, ck, v) values (?, ?, ?)"), ConsistencyLevel.ALL, 1, i, i%5);
64+
})
65+
.runAfterClusterUpgrade((cluster) -> {
66+
AtomicBoolean stop = new AtomicBoolean();
67+
AtomicInteger queryFailures = new AtomicInteger();
68+
Thread t = new Thread(() -> {
69+
while (!stop.get())
70+
{
71+
try
72+
{
73+
cluster.coordinator(1).execute(withKeyspace("select * from %s.tbl where pk=1 and v=4"), ConsistencyLevel.ALL);
74+
}
75+
catch (Throwable e)
76+
{
77+
queryFailures.incrementAndGet();
78+
}
79+
}
80+
});
81+
t.start();
82+
cluster.get(1).nodetoolResult("cms", "initialize").asserts().success();
83+
stop.set(true);
84+
t.join();
85+
assertEquals(0, queryFailures.get());
86+
}).run();
87+
}
88+
89+
public static class BBInstaller
90+
{
91+
public static void install(ClassLoader classLoader, int inst)
92+
{
93+
if (inst == 1)
94+
return;
95+
new ByteBuddy().rebase(CassandraIndexSearcher.class)
96+
.method(named("queryIndex"))
97+
.intercept(MethodDelegation.to(BBInterceptor.class))
98+
.make()
99+
.load(classLoader, ClassLoadingStrategy.Default.INJECTION);
100+
}
101+
}
102+
103+
public static class BBInterceptor
104+
{
105+
public static UnfilteredRowIterator queryIndex(DecoratedKey indexKey, ReadCommand command, ReadExecutionController executionController, @SuperCall Callable<UnfilteredRowIterator> zuper) throws Exception
106+
{
107+
// this makes it more likely that we decorate the key with one partitioner and execute the SinglePartitionReadCommand with another
108+
Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
109+
return zuper.call();
110+
}
111+
}
112+
}

0 commit comments

Comments
 (0)