Skip to content

Commit 784e098

Browse files
authored
IGNITE-28460 Sql. Choose commit partition from list of involved partitions (#7936)
1 parent cdf6ea3 commit 784e098

4 files changed

Lines changed: 140 additions & 68 deletions

File tree

modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java

Lines changed: 24 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727

2828
import com.github.benmanes.caffeine.cache.Caffeine;
2929
import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
30+
import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
3031
import it.unimi.dsi.fastutil.longs.Long2ObjectMaps;
3132
import java.lang.management.ManagementFactory;
3233
import java.lang.management.ThreadInfo;
@@ -49,7 +50,6 @@
4950
import java.util.concurrent.ConcurrentHashMap;
5051
import java.util.concurrent.ConcurrentLinkedQueue;
5152
import java.util.concurrent.ConcurrentMap;
52-
import java.util.concurrent.ThreadLocalRandom;
5353
import java.util.concurrent.TimeUnit;
5454
import java.util.concurrent.TimeoutException;
5555
import java.util.concurrent.atomic.AtomicBoolean;
@@ -114,15 +114,10 @@
114114
import org.apache.ignite.internal.sql.engine.prepare.ExplainPlan;
115115
import org.apache.ignite.internal.sql.engine.prepare.ExplainablePlan;
116116
import org.apache.ignite.internal.sql.engine.prepare.Fragment;
117-
import org.apache.ignite.internal.sql.engine.prepare.IgniteRelShuttle;
118117
import org.apache.ignite.internal.sql.engine.prepare.KillPlan;
119118
import org.apache.ignite.internal.sql.engine.prepare.MultiStepPlan;
120119
import org.apache.ignite.internal.sql.engine.prepare.QueryPlan;
121-
import org.apache.ignite.internal.sql.engine.rel.IgniteIndexScan;
122120
import org.apache.ignite.internal.sql.engine.rel.IgniteRel;
123-
import org.apache.ignite.internal.sql.engine.rel.IgniteTableModify;
124-
import org.apache.ignite.internal.sql.engine.rel.IgniteTableScan;
125-
import org.apache.ignite.internal.sql.engine.rel.SourceAwareIgniteRel;
126121
import org.apache.ignite.internal.sql.engine.schema.IgniteSchemas;
127122
import org.apache.ignite.internal.sql.engine.schema.IgniteTable;
128123
import org.apache.ignite.internal.sql.engine.schema.SqlSchemaManager;
@@ -1292,65 +1287,39 @@ private CompletableFuture<AsyncCursor<InternalSqlRow>> sendFragments(
12921287
}
12931288

12941289
private void enlistPartitions(MappedFragment mappedFragment, InternalTransaction tx) {
1295-
// no need to traverse the tree if fragment has no tables
1296-
if (mappedFragment.fragment().tables().isEmpty()) {
1297-
return;
1298-
}
1299-
1300-
new IgniteRelShuttle() {
1301-
@Override
1302-
public IgniteRel visit(IgniteIndexScan rel) {
1303-
enlist(rel);
1304-
1305-
return super.visit(rel);
1306-
}
1307-
1308-
@Override
1309-
public IgniteRel visit(IgniteTableScan rel) {
1310-
enlist(rel);
1311-
1312-
return super.visit(rel);
1313-
}
1290+
boolean shouldAssignCommitPartition = tx.commitPartition() == null;
1291+
for (Long2ObjectMap.Entry<IgniteTable> entry : mappedFragment.fragment().tables().long2ObjectEntrySet()) {
1292+
long sourceId = entry.getLongKey();
1293+
IgniteTable table = entry.getValue();
13141294

1315-
@Override
1316-
public IgniteRel visit(IgniteTableModify rel) {
1317-
enlist(rel);
1295+
ColocationGroup colocationGroup = mappedFragment.groupsBySourceId().get(sourceId);
1296+
Int2ObjectMap<NodeWithConsistencyToken> assignments = colocationGroup.assignments();
13181297

1319-
return super.visit(rel);
1298+
if (assignments.isEmpty()) {
1299+
continue;
13201300
}
13211301

1322-
private void enlist(int tableId, int zoneId, Int2ObjectMap<NodeWithConsistencyToken> assignments) {
1323-
if (assignments.isEmpty()) {
1324-
return;
1325-
}
1326-
1327-
int partsCnt = assignments.size();
1328-
1329-
tx.assignCommitPartition(new ZonePartitionId(zoneId, ThreadLocalRandom.current().nextInt(partsCnt)));
1330-
1331-
for (Int2ObjectMap.Entry<NodeWithConsistencyToken> partWithToken : assignments.int2ObjectEntrySet()) {
1332-
ZonePartitionId replicationGroupId = new ZonePartitionId(zoneId, partWithToken.getIntKey());
1302+
int tableId = table.id();
1303+
int zoneId = table.zoneId();
13331304

1334-
NodeWithConsistencyToken assignment = partWithToken.getValue();
1335-
1336-
tx.enlist(
1337-
replicationGroupId,
1338-
tableId,
1339-
assignment.name(),
1340-
assignment.enlistmentConsistencyToken()
1341-
);
1342-
}
1305+
if (shouldAssignCommitPartition) {
1306+
tx.assignCommitPartition(new ZonePartitionId(zoneId, assignments.keySet().iterator().nextInt()));
1307+
shouldAssignCommitPartition = false;
13431308
}
13441309

1345-
private void enlist(SourceAwareIgniteRel rel) {
1346-
IgniteTable igniteTable = rel.getTable().unwrap(IgniteTable.class);
1310+
for (Int2ObjectMap.Entry<NodeWithConsistencyToken> partWithToken : assignments.int2ObjectEntrySet()) {
1311+
ZonePartitionId partitionId = new ZonePartitionId(zoneId, partWithToken.getIntKey());
13471312

1348-
ColocationGroup colocationGroup = mappedFragment.groupsBySourceId().get(rel.sourceId());
1349-
Int2ObjectMap<NodeWithConsistencyToken> assignments = colocationGroup.assignments();
1313+
NodeWithConsistencyToken assignment = partWithToken.getValue();
13501314

1351-
enlist(igniteTable.id(), igniteTable.zoneId(), assignments);
1315+
tx.enlist(
1316+
partitionId,
1317+
tableId,
1318+
assignment.name(),
1319+
assignment.enlistmentConsistencyToken()
1320+
);
13521321
}
1353-
}.visit(mappedFragment.fragment().root());
1322+
}
13541323
}
13551324

13561325
private CompletableFuture<Void> close(CancellationReason reason) {

modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/TransactionEnlistTest.java

Lines changed: 102 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
import static org.mockito.ArgumentMatchers.any;
2222
import static org.mockito.ArgumentMatchers.anyInt;
2323
import static org.mockito.ArgumentMatchers.anyLong;
24+
import static org.mockito.ArgumentMatchers.argThat;
25+
import static org.mockito.Mockito.never;
2426
import static org.mockito.Mockito.times;
2527

2628
import java.util.List;
@@ -29,34 +31,44 @@
2931
import java.util.stream.IntStream;
3032
import org.apache.ignite.internal.hlc.HybridTimestampTracker;
3133
import org.apache.ignite.internal.manager.ComponentContext;
34+
import org.apache.ignite.internal.replicator.ZonePartitionId;
3235
import org.apache.ignite.internal.sql.engine.AsyncSqlCursor;
3336
import org.apache.ignite.internal.sql.engine.InternalSqlRow;
3437
import org.apache.ignite.internal.sql.engine.QueryProcessor;
3538
import org.apache.ignite.internal.sql.engine.SqlProperties;
39+
import org.apache.ignite.internal.sql.engine.exec.mapping.ColocationGroup;
3640
import org.apache.ignite.internal.sql.engine.framework.DataProvider;
3741
import org.apache.ignite.internal.sql.engine.framework.NoOpTransaction;
3842
import org.apache.ignite.internal.sql.engine.framework.TestBuilders;
3943
import org.apache.ignite.internal.sql.engine.framework.TestCluster;
4044
import org.apache.ignite.internal.sql.engine.framework.TestNode;
4145
import org.apache.ignite.internal.sql.engine.prepare.QueryMetadata;
4246
import org.apache.ignite.internal.sql.engine.prepare.QueryPlan;
47+
import org.apache.ignite.internal.sql.engine.schema.PartitionCalculator;
48+
import org.apache.ignite.internal.sql.engine.schema.TableDescriptor;
4349
import org.apache.ignite.internal.sql.engine.util.InjectQueryCheckerFactory;
4450
import org.apache.ignite.internal.sql.engine.util.QueryChecker;
4551
import org.apache.ignite.internal.sql.engine.util.QueryCheckerExtension;
4652
import org.apache.ignite.internal.sql.engine.util.QueryCheckerFactory;
4753
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
4854
import org.apache.ignite.internal.tx.InternalTransaction;
55+
import org.apache.ignite.internal.type.NativeType;
56+
import org.apache.ignite.internal.type.NativeTypes;
4957
import org.apache.ignite.lang.CancellationToken;
5058
import org.jetbrains.annotations.Nullable;
5159
import org.junit.jupiter.api.AfterAll;
5260
import org.junit.jupiter.api.BeforeAll;
5361
import org.junit.jupiter.api.Test;
5462
import org.junit.jupiter.api.extension.ExtendWith;
63+
import org.junit.jupiter.params.ParameterizedTest;
64+
import org.junit.jupiter.params.provider.ValueSource;
65+
import org.mockito.ArgumentMatcher;
5566
import org.mockito.Mockito;
5667

5768
/** Transactions enlist count test. */
5869
@ExtendWith(QueryCheckerExtension.class)
5970
public class TransactionEnlistTest extends BaseIgniteAbstractTest {
71+
private static final int PARTITIONS_COUNT = 3;
6072
private static final String NODE_NAME1 = "N1";
6173

6274
@InjectQueryCheckerFactory
@@ -72,13 +84,45 @@ static void startCluster() {
7284

7385
//noinspection ConcatenationWithEmptyString
7486
CLUSTER.node("N1").initSchema(""
75-
+ "CREATE ZONE test_zone (partitions 3) storage profiles ['Default'];"
76-
+ "CREATE TABLE t1 (id INT PRIMARY KEY, val INT) ZONE test_zone");
87+
+ "CREATE ZONE test_zone (partitions " + PARTITIONS_COUNT + ") storage profiles ['Default'];"
88+
+ "CREATE TABLE t1 (id INT PRIMARY KEY, val INT) ZONE test_zone;");
7789

7890
CLUSTER.setAssignmentsProvider("T1", (partitionCount, b) -> IntStream.range(0, partitionCount)
7991
.mapToObj(i -> List.of("N1"))
8092
.collect(Collectors.toList()));
8193
CLUSTER.setDataProvider("T1", TestBuilders.tableScan(DataProvider.fromCollection(List.of())));
94+
CLUSTER.setUpdatableTable("T1", new UpdatableTable() {
95+
@Override
96+
public TableDescriptor descriptor() {
97+
return null;
98+
}
99+
100+
@Override
101+
public <RowT> CompletableFuture<?> insertAll(ExecutionContext<RowT> ectx, List<RowT> rows, ColocationGroup colocationGroup) {
102+
return nullCompletedFuture();
103+
}
104+
105+
@Override
106+
public <RowT> CompletableFuture<Void> insert(@Nullable InternalTransaction explicitTx, ExecutionContext<RowT> ectx, RowT row) {
107+
return nullCompletedFuture();
108+
}
109+
110+
@Override
111+
public <RowT> CompletableFuture<?> upsertAll(ExecutionContext<RowT> ectx, List<RowT> rows, ColocationGroup colocationGroup) {
112+
return nullCompletedFuture();
113+
}
114+
115+
@Override
116+
public <RowT> CompletableFuture<Boolean> delete(@Nullable InternalTransaction explicitTx, ExecutionContext<RowT> ectx,
117+
RowT key) {
118+
return nullCompletedFuture();
119+
}
120+
121+
@Override
122+
public <RowT> CompletableFuture<?> deleteAll(ExecutionContext<RowT> ectx, List<RowT> rows, ColocationGroup colocationGroup) {
123+
return nullCompletedFuture();
124+
}
125+
});
82126
}
83127

84128
@AfterAll
@@ -104,6 +148,55 @@ void testEnlistCall() {
104148
Mockito.verify(spiedTx, times(2)).enlist(any(), anyInt(), any(), anyLong());
105149
}
106150

151+
@ParameterizedTest
152+
@ValueSource(ints = {1, 2, 3})
153+
void testCommitPartitionChoice(int id) {
154+
NoOpTransaction tx = NoOpTransaction.readWrite("t1", false);
155+
156+
NoOpTransaction spiedTx = Mockito.spy(tx);
157+
158+
assertQuery("UPDATE t1 /*+ no_index */ SET val = 42 WHERE id = ?", spiedTx)
159+
.withParam(id)
160+
.check();
161+
162+
int expectedPartition = expectedPartition(id);
163+
{
164+
ArgumentMatcher<ZonePartitionId> partitionIdMatch = zonePartitionId -> zonePartitionId.partitionId() == expectedPartition;
165+
// We expect commit partitions to be assigned once for given transaction.
166+
Mockito.verify(spiedTx, times(1))
167+
.assignCommitPartition(argThat(partitionIdMatch));
168+
// Individual partition on the other hand will be enlisted for every source.
169+
// In this particular case -- first time for scan and second for Modify node.
170+
Mockito.verify(spiedTx, times(2))
171+
.enlist(argThat(partitionIdMatch), anyInt(), any(), anyLong());
172+
}
173+
174+
{
175+
// Due to partition pruning we don't expect any more enlistment.
176+
// We should not try to assign other partition as commit partition as well.
177+
ArgumentMatcher<ZonePartitionId> partitionIdMismatch = zonePartitionId -> zonePartitionId.partitionId() != expectedPartition;
178+
Mockito.verify(spiedTx, never())
179+
.assignCommitPartition(argThat(partitionIdMismatch));
180+
Mockito.verify(spiedTx, never())
181+
.enlist(argThat(partitionIdMismatch), anyInt(), any(), anyLong());
182+
}
183+
}
184+
185+
@Test
186+
void testNoCommitPartitionAssignment() {
187+
NoOpTransaction tx = NoOpTransaction.readWrite("t1", false);
188+
tx.assignCommitPartition(new ZonePartitionId(1, 1));
189+
190+
NoOpTransaction spiedTx = Mockito.spy(tx);
191+
192+
assertQuery("UPDATE t1 /*+ no_index */ SET val = 42 WHERE id = ?", spiedTx)
193+
.withParam(1)
194+
.check();
195+
196+
// Transaction already has a commit partition, so no assignment is expected during query processing.
197+
Mockito.verify(spiedTx, never()).assignCommitPartition(any());
198+
}
199+
107200
private static QueryChecker assertQuery(String qry, InternalTransaction tx) {
108201
TestNode testNode = CLUSTER.node(NODE_NAME1);
109202

@@ -149,10 +242,9 @@ public CompletableFuture<AsyncSqlCursor<InternalSqlRow>> queryAsync(
149242
String qry,
150243
Object... params
151244
) {
152-
assert params == null || params.length == 0 : "params are not supported";
153245
assert !prepareOnly : "Expected that the query will only be prepared, but not executed";
154246

155-
AsyncSqlCursor<InternalSqlRow> sqlCursor = node.executeQuery(transaction, qry);
247+
AsyncSqlCursor<InternalSqlRow> sqlCursor = node.executeQuery(transaction, qry, params);
156248

157249
return CompletableFuture.completedFuture(sqlCursor);
158250
}
@@ -169,4 +261,10 @@ public CompletableFuture<Void> stopAsync(ComponentContext componentContext) {
169261
return nullCompletedFuture();
170262
}
171263
}
264+
265+
private static int expectedPartition(int key) {
266+
var calculator = new PartitionCalculator(PARTITIONS_COUNT, new NativeType[] {NativeTypes.INT32});
267+
calculator.append(key);
268+
return calculator.partition();
269+
}
172270
}

modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ScannableTableSelfTest.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@
5959
import org.apache.ignite.internal.binarytuple.BinaryTuple;
6060
import org.apache.ignite.internal.binarytuple.BinaryTuplePrefix;
6161
import org.apache.ignite.internal.network.InternalClusterNode;
62+
import org.apache.ignite.internal.replicator.ZonePartitionId;
6263
import org.apache.ignite.internal.schema.BinaryRow;
6364
import org.apache.ignite.internal.schema.BinaryRowEx;
6465
import org.apache.ignite.internal.sql.engine.api.expressions.RowFactory;
@@ -80,6 +81,7 @@
8081
import org.apache.ignite.internal.table.TxContext;
8182
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
8283
import org.apache.ignite.internal.type.StructNativeType;
84+
import org.apache.ignite.tx.Transaction;
8385
import org.hamcrest.Matchers;
8486
import org.jetbrains.annotations.Nullable;
8587
import org.junit.jupiter.api.Named;
@@ -220,7 +222,7 @@ private static Stream<Arguments> indexScanParameters() {
220222
for (Bound leftBound : Bound.values()) {
221223
for (Bound rightBound : Bound.values()) {
222224
params.add(Arguments.of(NoOpTransaction.readOnly("RO", false), leftBound, rightBound));
223-
params.add(Arguments.of(NoOpTransaction.readWrite("RW", false), leftBound, rightBound));
225+
params.add(Arguments.of(rwWithCommitPartition(), leftBound, rightBound));
224226
}
225227
}
226228

@@ -483,10 +485,16 @@ public void testIndexLookupError(NoOpTransaction tx) {
483485
private static Stream<Arguments> transactions() {
484486
return Stream.of(
485487
Arguments.of(Named.of("Read-only transaction", NoOpTransaction.readOnly("RO", false))),
486-
Arguments.of(Named.of("Read-write transaction", NoOpTransaction.readWrite("RW", false)))
488+
Arguments.of(Named.of("Read-write transaction", rwWithCommitPartition()))
487489
);
488490
}
489491

492+
private static Transaction rwWithCommitPartition() {
493+
NoOpTransaction tx = NoOpTransaction.readWrite("RW", false);
494+
tx.assignCommitPartition(new ZonePartitionId(1, 1));
495+
return tx;
496+
}
497+
490498
private class Tester {
491499

492500
final ScannableTable scannableTable;

modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/NoOpTransaction.java

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import java.util.UUID;
2727
import java.util.concurrent.CompletableFuture;
2828
import java.util.concurrent.TimeoutException;
29+
import java.util.concurrent.atomic.AtomicReference;
2930
import org.apache.ignite.internal.hlc.HybridTimestamp;
3031
import org.apache.ignite.internal.network.ClusterNodeImpl;
3132
import org.apache.ignite.internal.network.InternalClusterNode;
@@ -42,12 +43,8 @@
4243
* Dummy transaction that should be used as mock transaction for execution tests.
4344
*/
4445
public final class NoOpTransaction implements InternalTransaction {
45-
private static final int ZONE_ID = 1;
46-
4746
private static final int TABLE_ID = 2;
4847

49-
private static final int PARTITION_ID = 2;
50-
5148
private final UUID id;
5249

5350
private final HybridTimestamp hybridTimestamp = new HybridTimestamp(1, 1)
@@ -57,7 +54,7 @@ public final class NoOpTransaction implements InternalTransaction {
5754

5855
private final PendingTxPartitionEnlistment enlistment;
5956

60-
private final ZonePartitionId groupId = new ZonePartitionId(ZONE_ID, PARTITION_ID);
57+
private final AtomicReference<ZonePartitionId> commitPartition = new AtomicReference<>();
6158

6259
private final boolean implicit;
6360

@@ -178,12 +175,12 @@ public TxState state() {
178175

179176
@Override
180177
public boolean assignCommitPartition(ZonePartitionId replicationGroupId) {
181-
return true;
178+
return commitPartition.compareAndSet(null, replicationGroupId);
182179
}
183180

184181
@Override
185182
public ZonePartitionId commitPartition() {
186-
return groupId;
183+
return commitPartition.get();
187184
}
188185

189186
@Override

0 commit comments

Comments
 (0)