diff --git a/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/sql/JmhSqlAggBenchmark.java b/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/sql/JmhSqlAggBenchmark.java index e767c1936deb4..c4ef1bee72b18 100644 --- a/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/sql/JmhSqlAggBenchmark.java +++ b/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/sql/JmhSqlAggBenchmark.java @@ -18,7 +18,16 @@ package org.apache.ignite.internal.benchmarks.jmh.sql; import java.util.List; +import java.util.concurrent.TimeUnit; import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; import org.openjdk.jmh.runner.Runner; import org.openjdk.jmh.runner.options.Options; import org.openjdk.jmh.runner.options.OptionsBuilder; @@ -26,6 +35,12 @@ /** * Benchmark aggregate SQL queries. */ +@Fork(1) +@BenchmarkMode(Mode.Throughput) +@OutputTimeUnit(TimeUnit.SECONDS) +@Warmup(iterations = 3, time = 5) +@Measurement(iterations = 3, time = 5) +@State(Scope.Benchmark) public class JmhSqlAggBenchmark extends JmhSqlAbstractBenchmark { /** * Query with group by and aggregate. diff --git a/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/sql/JmhSqlCorrelateBenchmark.java b/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/sql/JmhSqlCorrelateBenchmark.java index 07004ae63ee41..64d8a3eab7ec9 100644 --- a/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/sql/JmhSqlCorrelateBenchmark.java +++ b/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/sql/JmhSqlCorrelateBenchmark.java @@ -19,7 +19,16 @@ import java.util.List; import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; import org.openjdk.jmh.runner.Runner; import org.openjdk.jmh.runner.options.Options; import org.openjdk.jmh.runner.options.OptionsBuilder; @@ -27,6 +36,12 @@ /** * Benchmark correlated SQL queries. */ +@Fork(1) +@BenchmarkMode(Mode.Throughput) +@OutputTimeUnit(TimeUnit.SECONDS) +@Warmup(iterations = 3, time = 5) +@Measurement(iterations = 3, time = 5) +@State(Scope.Benchmark) public class JmhSqlCorrelateBenchmark extends JmhSqlAbstractBenchmark { /** * Query with correlated subquery. diff --git a/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/sql/JmhSqlDmlBenchmark.java b/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/sql/JmhSqlDmlBenchmark.java index 6cdb00ff48ed1..ee809a7bf480e 100644 --- a/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/sql/JmhSqlDmlBenchmark.java +++ b/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/sql/JmhSqlDmlBenchmark.java @@ -19,8 +19,17 @@ import java.util.List; import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; import org.apache.ignite.IgniteDataStreamer; import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; import org.openjdk.jmh.runner.Runner; import org.openjdk.jmh.runner.options.Options; import org.openjdk.jmh.runner.options.OptionsBuilder; @@ -28,6 +37,12 @@ /** * Benchmark DML queries. */ +@Fork(1) +@BenchmarkMode(Mode.Throughput) +@OutputTimeUnit(TimeUnit.SECONDS) +@Warmup(iterations = 3, time = 5) +@Measurement(iterations = 3, time = 5) +@State(Scope.Benchmark) public class JmhSqlDmlBenchmark extends JmhSqlAbstractBenchmark { /** * Initiate new cache. diff --git a/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/sql/JmhSqlInsertBenchmark.java b/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/sql/JmhSqlInsertBenchmark.java new file mode 100644 index 0000000000000..578dc1d8bab94 --- /dev/null +++ b/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/sql/JmhSqlInsertBenchmark.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.benchmarks.jmh.sql; + +import java.util.concurrent.TimeUnit; +import java.util.stream.IntStream; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.runner.Runner; +import org.openjdk.jmh.runner.options.Options; +import org.openjdk.jmh.runner.options.OptionsBuilder; + +import static java.lang.String.format; +import static java.util.stream.Collectors.joining; + +/** + * Benchmark for insertion operation, comparing SQL APIs. + */ +@State(Scope.Benchmark) +@Fork(1) +@Threads(1) +@Warmup(iterations = 10, time = 2) +@Measurement(iterations = 10, time = 2) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MICROSECONDS) +public class JmhSqlInsertBenchmark extends JmhSqlAbstractBenchmark { + /** */ + private int id; + + /** */ + private static final String FIELD_VAL = "a".repeat(100); + + /** */ + private static final String TABLE_NAME = "dept"; + + /** */ + private String insertStr; + + /** */ + private String multiInsertStr; + + /** + * Initiate new tables. + */ + @Override public void setup() { + super.setup(); + + insertStr = createInsertStatement(); + multiInsertStr = createMultiInsertStatement(); + + executeSql("CREATE TABLE " + TABLE_NAME + + "(ycsb_key int PRIMARY KEY," + + "field1 varchar(100)," + + "field2 varchar(100)," + + "field3 varchar(100)," + + "field4 varchar(100)," + + "field5 varchar(100)," + + "field6 varchar(100)," + + "field7 varchar(100)," + + "field8 varchar(100)," + + "field9 varchar(100)," + + "field10 varchar(100))" + ); + } + + /** + * Benchmark for SQL insert via embedded client. + */ + @Benchmark + public void sqlSimpleInsert() { + executeSql(insertStr, id++); + } + + /** + * Benchmark for batch SQL insert via embedded client. + */ + @Benchmark + public void sqlBatchInsert() { + id += 2; + + executeSql(multiInsertStr, id, id + 1); + } + + /** + * Run benchmarks. + * + * @param args Args. + * @throws Exception Exception. + */ + public static void main(String[] args) throws Exception { + final Options options = new OptionsBuilder() + .include(JmhSqlInsertBenchmark.class.getSimpleName()) + .build(); + + new Runner(options).run(); + } + + /** */ + private static String createInsertStatement() { + /** */ + String insertQryTemplate = "insert into %s(%s, %s) values(?, %s)"; + + String fieldsQ = IntStream.range(1, 11).mapToObj(i -> "field" + i).collect(joining(",")); + String valQ = IntStream.range(1, 11).mapToObj(i -> "'" + FIELD_VAL + "'").collect(joining(",")); + + return format(insertQryTemplate, TABLE_NAME, "ycsb_key", fieldsQ, valQ); + } + + /** */ + private static String createMultiInsertStatement() { + /** */ + String insertQryTemplate = "insert into %s(%s, %s) values(?, %s), (?, %s)"; + + String fieldsQ = IntStream.range(1, 11).mapToObj(i -> "field" + i).collect(joining(",")); + String valQ = IntStream.range(1, 11).mapToObj(i -> "'" + FIELD_VAL + "'").collect(joining(",")); + + return format(insertQryTemplate, TABLE_NAME, "ycsb_key", fieldsQ, valQ, valQ); + } +} diff --git a/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/sql/JmhSqlScanBenchmark.java b/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/sql/JmhSqlScanBenchmark.java index 42080465b805c..6c888815e8308 100644 --- a/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/sql/JmhSqlScanBenchmark.java +++ b/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/sql/JmhSqlScanBenchmark.java @@ -19,7 +19,16 @@ import java.util.List; import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; import org.openjdk.jmh.runner.Runner; import org.openjdk.jmh.runner.options.Options; import org.openjdk.jmh.runner.options.OptionsBuilder; @@ -27,6 +36,12 @@ /** * Benchmark scan SQL queries. */ +@Fork(1) +@BenchmarkMode(Mode.Throughput) +@OutputTimeUnit(TimeUnit.SECONDS) +@Warmup(iterations = 3, time = 5) +@Measurement(iterations = 3, time = 5) +@State(Scope.Benchmark) public class JmhSqlScanBenchmark extends JmhSqlAbstractBenchmark { /** * Query unique value (full scan). diff --git a/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/sql/JmhSqlSetOpBenchmark.java b/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/sql/JmhSqlSetOpBenchmark.java index 09f8891218b78..b4b7a50221a8c 100644 --- a/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/sql/JmhSqlSetOpBenchmark.java +++ b/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/sql/JmhSqlSetOpBenchmark.java @@ -19,7 +19,16 @@ import java.util.List; import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; import org.openjdk.jmh.profile.GCProfiler; import org.openjdk.jmh.runner.Runner; import org.openjdk.jmh.runner.options.Options; @@ -28,6 +37,12 @@ /** * Benchmark set op SQL queries. */ +@Fork(1) +@BenchmarkMode(Mode.Throughput) +@OutputTimeUnit(TimeUnit.SECONDS) +@Warmup(iterations = 3, time = 5) +@Measurement(iterations = 3, time = 5) +@State(Scope.Benchmark) public class JmhSqlSetOpBenchmark extends JmhSqlAbstractBenchmark { /** * Query with EXCEPT set op. diff --git a/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/sql/JmhSqlSortBenchmark.java b/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/sql/JmhSqlSortBenchmark.java index 8493a2a91389e..48ba21053b4d4 100644 --- a/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/sql/JmhSqlSortBenchmark.java +++ b/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/sql/JmhSqlSortBenchmark.java @@ -19,7 +19,16 @@ import java.util.List; import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; import org.openjdk.jmh.runner.Runner; import org.openjdk.jmh.runner.options.Options; import org.openjdk.jmh.runner.options.OptionsBuilder; @@ -27,6 +36,12 @@ /** * Benchmark sort SQL queries. */ +@Fork(1) +@BenchmarkMode(Mode.Throughput) +@OutputTimeUnit(TimeUnit.SECONDS) +@Warmup(iterations = 3, time = 5) +@Measurement(iterations = 3, time = 5) +@State(Scope.Benchmark) public class JmhSqlSortBenchmark extends JmhSqlAbstractBenchmark { /** * Query with sorting (full set). diff --git a/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/sql/JmhSqlUdfBenchmark.java b/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/sql/JmhSqlUdfBenchmark.java index c3d24ca055b27..5a606b447dd18 100644 --- a/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/sql/JmhSqlUdfBenchmark.java +++ b/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/sql/JmhSqlUdfBenchmark.java @@ -18,9 +18,18 @@ package org.apache.ignite.internal.benchmarks.jmh.sql; import java.util.List; +import java.util.concurrent.TimeUnit; import org.apache.ignite.cache.query.annotations.QuerySqlFunction; import org.apache.ignite.configuration.CacheConfiguration; import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; import org.openjdk.jmh.runner.Runner; import org.openjdk.jmh.runner.options.Options; import org.openjdk.jmh.runner.options.OptionsBuilder; @@ -28,6 +37,12 @@ /** * Benchmark user defined functions in SQL queries. */ +@Fork(1) +@BenchmarkMode(Mode.Throughput) +@OutputTimeUnit(TimeUnit.SECONDS) +@Warmup(iterations = 3, time = 5) +@Measurement(iterations = 3, time = 5) +@State(Scope.Benchmark) public class JmhSqlUdfBenchmark extends JmhSqlAbstractBenchmark { /** {@inheritDoc} */ @Override protected CacheConfiguration cacheConfiguration() { diff --git a/modules/calcite/pom.xml b/modules/calcite/pom.xml index fb42c00c24094..5daf56bf1e633 100644 --- a/modules/calcite/pom.xml +++ b/modules/calcite/pom.xml @@ -37,7 +37,6 @@ 1.26.0 1.40.0 - 3.10.0 1.0.1 2.8.2 3.1.12 @@ -94,12 +93,6 @@ failureaccess - - org.checkerframework - checker-qual - ${checker.version} - - org.codehaus.janino commons-compiler diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ModifyNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ModifyNode.java index fcf2b8ebd3315..f055981440e86 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ModifyNode.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ModifyNode.java @@ -241,6 +241,8 @@ private void flushTuples(boolean force) throws Exception { if (keepBinaryMode) cache = cache.keepBinary(); + cache = cache.withCalciteEngine(); + if (tx == null) invokeOutsideTransaction(tuples, cache); else diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/PublicApiIntegrationTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/PublicApiIntegrationTest.java new file mode 100644 index 0000000000000..89126c04a29ca --- /dev/null +++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/PublicApiIntegrationTest.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.calcite.integration; + +import java.util.List; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.query.SqlFieldsQuery; +import org.apache.ignite.calcite.CalciteQueryEngineConfiguration; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.configuration.SqlConfiguration; +import org.apache.ignite.transactions.Transaction; +import org.junit.Test; + +import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; +import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED; + +/** Public api integration tests. */ +public class PublicApiIntegrationTest extends AbstractBasicIntegrationTest { + /** */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + cfg.setSqlConfiguration(new SqlConfiguration() + .setQueryEnginesConfiguration(new CalciteQueryEngineConfiguration().setDefault(true))); + + return cfg; + } + + /** */ + @Test + public void testSimpleInsert() { + IgniteCache cache = client.createCache(DEFAULT_CACHE_NAME); + + runQuery(0, nodeCount() * 10, false, cache); + + cache = cache.withKeepBinary(); + + runQuery(nodeCount() * 10, 2 * nodeCount() * 10, false, cache); + + List> res = cache.query(new SqlFieldsQuery("SELECT * FROM emp")).getAll(); + + assertEquals("Unexpected result set size: " + res.size(), 1, res.size()); + } + + /** */ + @Test + public void testTxInsert() { + CacheConfiguration ccfg = new CacheConfiguration<>(DEFAULT_CACHE_NAME); + ccfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL); + + IgniteCache cache = client.createCache(ccfg); + + runQuery(0, nodeCount() * 10, true, cache); + + cache = cache.withKeepBinary(); + + runQuery(nodeCount() * 10, 2 * nodeCount() * 10, true, cache); + + List> res = cache.query(new SqlFieldsQuery("SELECT * FROM emp")).getAll(); + + assertEquals("Unexpected result set size: " + res.size(), 1, res.size()); + } + + /** */ + private void runQuery(int begin, int end, boolean transactional, IgniteCache cache) { + Transaction tx = null; + + cache.query(new SqlFieldsQuery("CREATE TABLE IF NOT EXISTS emp(empid INTEGER, deptid INTEGER, name VARCHAR, salary INTEGER, " + + "PRIMARY KEY(empid, deptid)) WITH \"AFFINITY_KEY=deptid" + (transactional ? ", ATOMICITY=transactional" : "") + "\"")) + .getAll(); + + if (transactional) { + //noinspection resource + tx = client.transactions().txStart(PESSIMISTIC, READ_COMMITTED); + } + + for (int i = begin; i < end; i++) { + cache.query(new SqlFieldsQuery("INSERT INTO emp (empid, deptid, name, salary) VALUES (?, ?, ?, ?)").setArgs( + i, i % 2, "Employee " + i, i / 10)).getAll(); + + cache.query(new SqlFieldsQuery("UPDATE emp SET name = '' WHERE empid = ? AND deptid = ?").setArgs(i, i % 2)).getAll(); + cache.query(new SqlFieldsQuery("DELETE FROM emp WHERE empid = ?").setArgs(i - 1)).getAll(); + } + + if (transactional) + tx.commit(); + } +} diff --git a/modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java b/modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java index f4e123c6a7c40..39726224cf074 100644 --- a/modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java +++ b/modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java @@ -61,6 +61,7 @@ import org.apache.ignite.internal.processors.query.calcite.integration.OperatorsExtensionIntegrationTest; import org.apache.ignite.internal.processors.query.calcite.integration.PartitionPruneTest; import org.apache.ignite.internal.processors.query.calcite.integration.PartitionsReservationIntegrationTest; +import org.apache.ignite.internal.processors.query.calcite.integration.PublicApiIntegrationTest; import org.apache.ignite.internal.processors.query.calcite.integration.QueryBlockingTaskExecutorIntegrationTest; import org.apache.ignite.internal.processors.query.calcite.integration.QueryEngineConfigurationIntegrationTest; import org.apache.ignite.internal.processors.query.calcite.integration.QueryMetadataIntegrationTest; @@ -183,6 +184,7 @@ CacheWithInterceptorIntegrationTest.class, TxThreadLockingTest.class, SelectByKeyFieldTest.class, + PublicApiIntegrationTest.class, }) public class IntegrationTestSuite { } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheOperationContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheOperationContext.java index b6470c60edc15..8ce5dda64a130 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheOperationContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheOperationContext.java @@ -34,11 +34,11 @@ public class CacheOperationContext implements Serializable { /** */ private static final long serialVersionUID = 0L; - /** Skip store. */ + /** Skip store flag. */ @GridToStringInclude private final boolean skipStore; - /** Skip store. */ + /** Skip read through flag. */ @GridToStringInclude private final boolean skipReadThrough; @@ -46,49 +46,38 @@ public class CacheOperationContext implements Serializable { @GridToStringInclude private final boolean noRetries; - /** */ + /** Recovery flag. */ private final boolean recovery; - /** Read-repair strategy. */ - private final ReadRepairStrategy readRepairStrategy; + /** Read-repair strategy.*/ + private final @Nullable ReadRepairStrategy readRepairStrategy; /** Keep binary flag. */ private final boolean keepBinary; /** Expiry policy. */ - private final ExpiryPolicy expiryPlc; + private final @Nullable ExpiryPolicy expiryPlc; /** Data center Id. */ - private final Byte dataCenterId; + private final @Nullable Byte dataCenterId; /** Application attributes. */ - private final Map appAttrs; + private final @Nullable Map appAttrs; - /** - * Constructor with default values. - */ - public CacheOperationContext() { - skipStore = false; - skipReadThrough = false; - keepBinary = false; - expiryPlc = null; - noRetries = false; - recovery = false; - readRepairStrategy = null; - dataCenterId = null; - appAttrs = null; - } + /** Calcite engine call flag. */ + private final boolean calciteEngineCall; /** - * @param skipStore Skip store flag. - * @param skipReadThrough Skip read-through cache store flag. - * @param keepBinary Keep binary flag. - * @param expiryPlc Expiry policy. - * @param dataCenterId Data center id. + * @param skipStore Skip store flag. + * @param skipReadThrough Skip read-through cache store flag. + * @param keepBinary Keep binary flag. + * @param expiryPlc Expiry policy. + * @param dataCenterId Data center id. * @param readRepairStrategy Read-repair strategy. - * @param appAttrs Application attributes. + * @param appAttrs Application attributes. + * @param calciteEngineCall Calcite engine call flag. */ - public CacheOperationContext( + private CacheOperationContext( boolean skipStore, boolean skipReadThrough, boolean keepBinary, @@ -97,7 +86,8 @@ public CacheOperationContext( @Nullable Byte dataCenterId, boolean recovery, @Nullable ReadRepairStrategy readRepairStrategy, - @Nullable Map appAttrs + @Nullable Map appAttrs, + boolean calciteEngineCall ) { this.skipStore = skipStore; this.skipReadThrough = skipReadThrough; @@ -108,38 +98,26 @@ public CacheOperationContext( this.recovery = recovery; this.readRepairStrategy = readRepairStrategy; this.appAttrs = appAttrs; + this.calciteEngineCall = calciteEngineCall; } /** - * @return Keep binary flag. + * Helper. */ - public boolean isKeepBinary() { - return keepBinary; + public static CacheOperationContext instance() { + return new Builder().build(); } /** - * @return {@code True} if data center id is set otherwise {@code false}. + * @return keepBinary flag. */ - public boolean hasDataCenterId() { - return dataCenterId != null; + public boolean isKeepBinary() { + return keepBinary; } - /** - * See {@link IgniteInternalCache#keepBinary()}. - * - * @return New instance of CacheOperationContext with keep binary flag. - */ - public CacheOperationContext keepBinary() { - return new CacheOperationContext( - skipStore, - skipReadThrough, - true, - expiryPlc, - noRetries, - dataCenterId, - recovery, - readRepairStrategy, - appAttrs); + /** Context with keepBinary flag. */ + public CacheOperationContext withKeepBinary() { + return builder(this).keepBinary(true).build(); } /** @@ -151,214 +129,291 @@ public CacheOperationContext keepBinary() { return dataCenterId; } - /** - * @return Skip store. - */ - public boolean skipStore() { - return skipStore; + /** Context with dataCenterId. */ + public CacheOperationContext dataCenterId(Byte dataCenterId) { + return builder(this).dataCenterId(dataCenterId).build(); } /** - * See {@link IgniteInternalCache#setSkipStore(boolean)}. - * - * @param skipStore Skip store flag. - * @return New instance of CacheOperationContext with skip store flag. + * @return Partition recover flag. */ - public CacheOperationContext setSkipStore(boolean skipStore) { - return new CacheOperationContext( - skipStore, - skipReadThrough, - keepBinary, - expiryPlc, - noRetries, - dataCenterId, - recovery, - readRepairStrategy, - appAttrs); + public boolean recovery() { + return recovery; } - /** @return Skip read-through cache store. */ - public boolean skipReadThrough() { - return skipReadThrough; + /** Context with recovery flag. */ + public CacheOperationContext withRecovery() { + return builder(this).recovery(true).build(); } /** - * See {@link IgniteInternalCache#withApplicationAttributes(Map)}. - * - * @return New instance of CacheOperationContext with new application attributes. + * @return Read Repair strategy. */ - public CacheOperationContext withApplicationAttributes(Map attrs) { - return new CacheOperationContext( - skipStore, - skipReadThrough, - keepBinary, - expiryPlc, - noRetries, - dataCenterId, - recovery, - readRepairStrategy, - Collections.unmodifiableMap(attrs)); + @Nullable public ReadRepairStrategy readRepairStrategy() { + return readRepairStrategy; } - /** - * See {@link IgniteInternalCache#withSkipReadThrough()}. - * - * @return New instance of CacheOperationContext with skip store flag. - */ - public CacheOperationContext withSkipReadThrough() { - return new CacheOperationContext( - skipStore, - true, - keepBinary, - expiryPlc, - noRetries, - dataCenterId, - recovery, - readRepairStrategy, - appAttrs); + /** Context with read repair strategy. */ + public CacheOperationContext readRepairStrategy(ReadRepairStrategy strategy) { + return builder(this).readRepairStrategy(strategy).build(); } /** - * @return {@link ExpiryPolicy} associated with this projection. + * @return No retries flag. */ - @Nullable public ExpiryPolicy expiry() { - return expiryPlc; + public boolean noRetries() { + return noRetries; } - /** - * See {@link IgniteInternalCache#withExpiryPolicy(ExpiryPolicy)}. - * - * @param plc {@link ExpiryPolicy} to associate with this projection. - * @return New instance of CacheOperationContext with skip store flag. - */ - public CacheOperationContext withExpiryPolicy(ExpiryPolicy plc) { - return new CacheOperationContext( - skipStore, - skipReadThrough, - keepBinary, - plc, - noRetries, - dataCenterId, - recovery, - readRepairStrategy, - appAttrs); + /** Context with noRetries flag. */ + public CacheOperationContext withNoRetries() { + return builder(this).noRetries(true).build(); } /** - * @param noRetries No retries flag. - * @return Operation context. + * @return Application attributes. */ - public CacheOperationContext setNoRetries(boolean noRetries) { - return new CacheOperationContext( - skipStore, - skipReadThrough, - keepBinary, - expiryPlc, - noRetries, - dataCenterId, - recovery, - readRepairStrategy, - appAttrs); + @Nullable public Map applicationAttributes() { + return appAttrs; } - /** - * @param dataCenterId Data center id. - * @return Operation context. - */ - public CacheOperationContext setDataCenterId(byte dataCenterId) { - return new CacheOperationContext( - skipStore, - skipReadThrough, - keepBinary, - expiryPlc, - noRetries, - dataCenterId, - recovery, - readRepairStrategy, - appAttrs); + /** Context with application attributes. */ + public CacheOperationContext withApplicationAttributes(Map attrs) { + return builder(this).applicationAttributes(Collections.unmodifiableMap(attrs)).build(); } /** - * @param recovery Recovery flag. - * @return New instance of CacheOperationContext with recovery flag. + * @return Skip store. */ - public CacheOperationContext setRecovery(boolean recovery) { - return new CacheOperationContext( - skipStore, - skipReadThrough, - keepBinary, - expiryPlc, - noRetries, - dataCenterId, - recovery, - readRepairStrategy, - appAttrs); + public boolean skipStore() { + return skipStore; } - /** - * @param readRepairStrategy Read Repair strategy. - * @return New instance of CacheOperationContext with Read Repair flag. - */ - public CacheOperationContext setReadRepairStrategy(ReadRepairStrategy readRepairStrategy) { - return new CacheOperationContext( - skipStore, - skipReadThrough, - keepBinary, - expiryPlc, - noRetries, - dataCenterId, - recovery, - readRepairStrategy, - appAttrs); + /** Context with skipStore flag. */ + public CacheOperationContext withSkipStore() { + return builder(this).skipStore(true).build(); } /** - * @param appAttrs Application attributes. - * @return New instance of CacheOperationContext with application attributes. + * @return Skip read-through cache store. */ - public CacheOperationContext setApplicationAttributes(Map appAttrs) { - return new CacheOperationContext( - skipStore, - skipReadThrough, - keepBinary, - expiryPlc, - noRetries, - dataCenterId, - recovery, - readRepairStrategy, - new HashMap<>(appAttrs)); + public boolean skipReadThrough() { + return skipReadThrough; } - /** - * @return Partition recover flag. - */ - public boolean recovery() { - return recovery; + /** Context with {@link CacheOperationContext#skipReadThrough} flag. */ + public CacheOperationContext withSkipReadThrough() { + return builder(this).skipReadThrough(true).build(); + } + + /** @return Calcite engine execution flag. */ + public boolean calciteEngine() { + return calciteEngineCall; + } + + /** Context with {@link CacheOperationContext#calciteEngine} flag. */ + public CacheOperationContext withCalciteEngine() { + return builder(this).calciteEngine(true).build(); } /** - * @return Read Repair strategy. + * @return {@link ExpiryPolicy} associated with this projection. */ - public ReadRepairStrategy readRepairStrategy() { - return readRepairStrategy; + @Nullable public ExpiryPolicy expiry() { + return expiryPlc; + } + + /** Context with {@link CacheOperationContext#expiryPlc}. */ + public CacheOperationContext withExpiryPolicy(ExpiryPolicy plc) { + return builder(this).expiryPolicy(plc).build(); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(CacheOperationContext.class, this); } /** - * @return No retries flag. + * Creates the builder for cache operations context. + * + * @return Builder for cache operations context. */ - public boolean noRetries() { - return noRetries; + public static Builder builder(CacheOperationContext ctx) { + return new Builder(ctx); } /** - * @return Application attributes. + * Creates the builder from existing context. + * + * @return Builder for cache operations context. */ - public Map applicationAttributes() { - return appAttrs; + public static Builder builder() { + return new Builder(); } - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(CacheOperationContext.class, this); + /** Cache operations context builder. */ + public static class Builder { + /** Skip store. */ + @GridToStringInclude + private boolean skipStore; + + /** Skip read through. */ + @GridToStringInclude + private boolean skipReadThrough; + + /** No retries flag. */ + @GridToStringInclude + private boolean noRetries; + + /** Recovery flag. */ + private boolean recovery; + + /** Read-repair strategy. */ + private ReadRepairStrategy readRepairStrategy; + + /** Keep binary flag. */ + private boolean keepBinary; + + /** Expiry policy. */ + private ExpiryPolicy expiryPlc; + + /** Data center Id. */ + private Byte dataCenterId; + + /** Application attributes. */ + private Map appAttrs; + + /** Calcite engine execution flag. */ + private boolean calciteEngine; + + /** */ + Builder() { + // No context. + } + + /** */ + Builder(CacheOperationContext ctx) { + skipStore = ctx.skipStore; + skipReadThrough = ctx.skipReadThrough; + noRetries = ctx.noRetries; + recovery = ctx.recovery; + readRepairStrategy = ctx.readRepairStrategy; + keepBinary = ctx.keepBinary; + expiryPlc = ctx.expiryPlc; + dataCenterId = ctx.dataCenterId; + appAttrs = ctx.appAttrs; + calciteEngine = ctx.calciteEngineCall; + } + + /** + * CacheOperationContext with keepBinary flag. + * + * @see IgniteInternalCache#keepBinary() + */ + public Builder keepBinary(boolean keepBinary) { + this.keepBinary = keepBinary; + return this; + } + + /** + * CacheOperationContext with skipStore flag. + * + * @see IgniteInternalCache#skipStore() + */ + public Builder skipStore(boolean skipStore) { + this.skipStore = skipStore; + return this; + } + + /** + * CacheOperationContext with attributes. + * + * @see IgniteInternalCache#withApplicationAttributes(Map) + */ + public Builder applicationAttributes(Map attrs) { + appAttrs = Collections.unmodifiableMap(new HashMap<>(attrs)); + return this; + } + + /** + * CacheOperationContext with skip read through flag. + * + * @see IgniteInternalCache#withSkipReadThrough() + */ + public Builder skipReadThrough(boolean skipReadThrough) { + this.skipReadThrough = skipReadThrough; + return this; + } + + /** + * CacheOperationContext with calcite execution flag. + * + * @see IgniteInternalCache#withCalciteEngine() + */ + public Builder calciteEngine(boolean calciteEngine) { + this.calciteEngine = calciteEngine; + return this; + } + + /** + * CacheOperationContext with expiry policy. + * + * @see IgniteInternalCache#withExpiryPolicy(ExpiryPolicy) + */ + public Builder expiryPolicy(ExpiryPolicy expiryPlc) { + this.expiryPlc = expiryPlc; + return this; + } + + /** + * CacheOperationContext with no retries flag. + */ + public Builder noRetries(boolean noRetries) { + this.noRetries = noRetries; + return this; + } + + /** + * CacheOperationContext with Data center id. + */ + public Builder dataCenterId(Byte dataCenterId) { + this.dataCenterId = dataCenterId; + return this; + } + + /** + * CacheOperationContext with recovery flag. + */ + public Builder recovery(boolean recovery) { + this.recovery = recovery; + return this; + } + + /** + * CacheOperationContext with read repair strategy. + */ + public Builder readRepairStrategy(ReadRepairStrategy readRepairStrategy) { + this.readRepairStrategy = readRepairStrategy; + return this; + } + + /** + * Builds cache operations context options. + * + * @return Cache operations context options. + */ + public CacheOperationContext build() { + return new CacheOperationContext( + skipStore, + skipReadThrough, + keepBinary, + expiryPlc, + noRetries, + dataCenterId, + recovery, + readRepairStrategy, + appAttrs, + calciteEngine); + } } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GatewayProtectedCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GatewayProtectedCacheProxy.java index fba0e990c5e97..5af8bba0c0864 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GatewayProtectedCacheProxy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GatewayProtectedCacheProxy.java @@ -166,7 +166,7 @@ public void setCacheManager(org.apache.ignite.cache.CacheManager cacheMgr) { if (skip) return this; - return new GatewayProtectedCacheProxy<>(delegate, opCtx.setSkipStore(true), lock); + return new GatewayProtectedCacheProxy<>(delegate, opCtx.withSkipStore(), lock); } finally { onLeave(opGate); @@ -180,7 +180,7 @@ public void setCacheManager(org.apache.ignite.cache.CacheManager cacheMgr) { CacheOperationGate opGate = onEnter(); try { - return new GatewayProtectedCacheProxy<>(delegate, opCtx.setApplicationAttributes(appAttrs), lock); + return new GatewayProtectedCacheProxy<>(delegate, opCtx.withApplicationAttributes(appAttrs), lock); } finally { onLeave(opGate); @@ -197,7 +197,7 @@ public void setCacheManager(org.apache.ignite.cache.CacheManager cacheMgr) { if (noRetries) return this; - return new GatewayProtectedCacheProxy<>(delegate, opCtx.setNoRetries(true), lock); + return new GatewayProtectedCacheProxy<>(delegate, opCtx.withNoRetries(), lock); } finally { onLeave(opGate); @@ -214,7 +214,7 @@ public void setCacheManager(org.apache.ignite.cache.CacheManager cacheMgr) { if (recovery) return this; - return new GatewayProtectedCacheProxy<>(delegate, opCtx.setRecovery(true), lock); + return new GatewayProtectedCacheProxy<>(delegate, opCtx.withRecovery(), lock); } finally { onLeave(opGate); @@ -244,7 +244,7 @@ public void setCacheManager(org.apache.ignite.cache.CacheManager cacheMgr) { if (opCtx.readRepairStrategy() == strategy) return this; - return new GatewayProtectedCacheProxy<>(delegate, opCtx.setReadRepairStrategy(strategy), lock); + return new GatewayProtectedCacheProxy<>(delegate, opCtx.readRepairStrategy(strategy), lock); } finally { onLeave(opGate); @@ -261,7 +261,7 @@ public void setCacheManager(org.apache.ignite.cache.CacheManager cacheMgr) { CacheOperationGate opGate = onEnter(); try { - return new GatewayProtectedCacheProxy<>((IgniteCacheProxy)delegate, opCtx.keepBinary(), lock); + return new GatewayProtectedCacheProxy<>((IgniteCacheProxy)delegate, opCtx.withKeepBinary(), lock); } finally { onLeave(opGate); @@ -278,7 +278,7 @@ public void setCacheManager(org.apache.ignite.cache.CacheManager cacheMgr) { if (prevDataCenterId != null && dataCenterId == prevDataCenterId) return this; - return new GatewayProtectedCacheProxy<>(delegate, opCtx.setDataCenterId(dataCenterId), lock); + return new GatewayProtectedCacheProxy<>(delegate, opCtx.dataCenterId(dataCenterId), lock); } finally { onLeave(opGate); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index 410bac3545fc2..369537432f116 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -466,59 +466,54 @@ public void active(boolean active) { } /** {@inheritDoc} */ - @Override public final GridCacheProxyImpl setSkipStore(boolean skipStore) { - CacheOperationContext opCtx = new CacheOperationContext( - true, - false, - false, - null, - false, - null, - false, - null, - null); + @Override public IgniteInternalCache withSkipStore() { + CacheOperationContext opCtx = ctx.operationContextPerCall(); + + if (opCtx == null) + opCtx = CacheOperationContext.builder().skipStore(true).build(); + else { + if (!opCtx.skipStore()) + opCtx = opCtx.withSkipStore(); + } return new GridCacheProxyImpl<>(ctx, this, opCtx); } /** {@inheritDoc} */ @Override public IgniteInternalCache withSkipReadThrough() { - CacheOperationContext opCtx = this.ctx.operationContextPerCall(); + CacheOperationContext opCtx = ctx.operationContextPerCall(); - if (opCtx == null) { - opCtx = new CacheOperationContext( - false, - true, - false, - null, - false, - null, - false, - null, - null); + if (opCtx == null) + opCtx = CacheOperationContext.builder().skipReadThrough(true).build(); + else { + if (!opCtx.skipReadThrough()) + opCtx = opCtx.withSkipReadThrough(); } - else - opCtx = opCtx.withSkipReadThrough(); - return new GridCacheProxyImpl<>(this.ctx, this, opCtx); + return new GridCacheProxyImpl<>(ctx, this, opCtx); } + /** {@inheritDoc} */ + @Override public IgniteInternalCache withCalciteEngine() { + CacheOperationContext opCtx = ctx.operationContextPerCall(); + + if (opCtx == null) + opCtx = CacheOperationContext.builder().calciteEngine(true).build(); + else { + if (!opCtx.calciteEngine()) + opCtx = opCtx.withCalciteEngine(); + } + + return new GridCacheProxyImpl<>(ctx, this, opCtx); + } + + /** @return New internal cache instance based on this one, but with application attributes. */ @Override public GridCacheProxyImpl withApplicationAttributes(Map attrs) { CacheOperationContext opCtx = ctx.operationContextPerCall(); - if (opCtx == null) { - opCtx = new CacheOperationContext( - false, - false, - false, - null, - false, - null, - false, - null, - new HashMap<>(attrs)); - } + if (opCtx == null) + opCtx = CacheOperationContext.builder().applicationAttributes(attrs).build(); else opCtx = opCtx.withApplicationAttributes(attrs); @@ -527,16 +522,7 @@ public void active(boolean active) { /** {@inheritDoc} */ @Override public final GridCacheProxyImpl keepBinary() { - CacheOperationContext opCtx = new CacheOperationContext( - false, - false, - true, - null, - false, - null, - false, - null, - null); + CacheOperationContext opCtx = CacheOperationContext.builder().keepBinary(true).build(); return new GridCacheProxyImpl<>((GridCacheContext)ctx, (GridCacheAdapter)this, opCtx); } @@ -550,32 +536,21 @@ public void active(boolean active) { @Override public final GridCacheProxyImpl withExpiryPolicy(ExpiryPolicy plc) { assert !CU.isUtilityCache(ctx.name()); - CacheOperationContext opCtx = new CacheOperationContext( - false, - false, - false, - plc, - false, - null, - false, - null, - null); + CacheOperationContext opCtx = CacheOperationContext.builder().expiryPolicy(plc).build(); return new GridCacheProxyImpl<>(ctx, this, opCtx); } /** {@inheritDoc} */ @Override public final IgniteInternalCache withNoRetries() { - CacheOperationContext opCtx = new CacheOperationContext( - false, - false, - false, - null, - true, - null, - false, - null, - null); + CacheOperationContext opCtx = ctx.operationContextPerCall(); + + if (opCtx == null) + opCtx = CacheOperationContext.builder().noRetries(true).build(); + else { + if (!opCtx.noRetries()) + opCtx = opCtx.withNoRetries(); + } return new GridCacheProxyImpl<>(ctx, this, opCtx); } @@ -4478,7 +4453,7 @@ private IgniteInternalFuture repairAtomicAsync( @Override public Boolean call() throws IgniteCheckedException { CacheOperationContext prevOpCtx = ctx.operationContextPerCall(); - ctx.operationContextPerCall(opCtx.keepBinary()); + ctx.operationContextPerCall(opCtx.withKeepBinary()); try { return invoke((K)key, new AtomicReadRepairEntryProcessor<>(correctedVal, primVer)).get(); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java index d06f64e039a77..3cae628343a62 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java @@ -251,25 +251,20 @@ public IgniteInternalCache delegate() { } /** {@inheritDoc} */ - @Override public GridCacheProxyImpl setSkipStore(boolean skipStore) { + @Override public GridCacheProxyImpl withSkipStore() { CacheOperationContext prev = gate.enter(opCtx); try { - if (opCtx != null && opCtx.skipStore() == skipStore) - return this; + if (opCtx != null) { + if (opCtx.skipStore()) + return this; + else + opCtx = opCtx.withSkipStore(); + } + else + opCtx = CacheOperationContext.builder().skipStore(true).build(); - return new GridCacheProxyImpl<>(ctx, delegate, - opCtx != null ? opCtx.setSkipStore(skipStore) : - new CacheOperationContext( - skipStore, - false, - false, - null, - false, - null, - false, - null, - null)); + return new GridCacheProxyImpl<>(ctx, delegate, opCtx); } finally { gate.leave(prev); @@ -281,21 +276,16 @@ public IgniteInternalCache delegate() { CacheOperationContext prev = gate.enter(opCtx); try { - if (opCtx != null && opCtx.skipReadThrough()) - return this; + if (opCtx != null) { + if (opCtx.skipReadThrough()) + return this; + else + opCtx = opCtx.withSkipReadThrough(); + } + else + opCtx = CacheOperationContext.builder().skipReadThrough(true).build(); - return new GridCacheProxyImpl<>(ctx, delegate, - opCtx != null ? opCtx.withSkipReadThrough() : - new CacheOperationContext( - false, - true, - false, - null, - false, - null, - false, - null, - null)); + return new GridCacheProxyImpl<>(ctx, delegate, opCtx); } finally { gate.leave(prev); @@ -307,18 +297,12 @@ public IgniteInternalCache delegate() { CacheOperationContext prev = gate.enter(opCtx); try { - return new GridCacheProxyImpl<>(ctx, delegate, - opCtx != null ? opCtx.setApplicationAttributes(attrs) : - new CacheOperationContext( - false, - false, - false, - null, - false, - null, - false, - null, - attrs)); + if (opCtx != null) + opCtx = opCtx.withApplicationAttributes(attrs); + else + opCtx = CacheOperationContext.builder().applicationAttributes(attrs).build(); + + return new GridCacheProxyImpl<>(ctx, delegate, opCtx); } finally { gate.leave(prev); @@ -326,22 +310,17 @@ public IgniteInternalCache delegate() { } /** {@inheritDoc} */ - @Override public GridCacheProxyImpl keepBinary() { - if (opCtx != null && opCtx.isKeepBinary()) - return (GridCacheProxyImpl)this; + @Override public GridCacheProxyImpl keepBinary() { + if (opCtx != null) { + if (opCtx.isKeepBinary()) + return this; + else + opCtx = opCtx.withKeepBinary(); + } + else + opCtx = CacheOperationContext.builder().keepBinary(true).build(); - return new GridCacheProxyImpl<>((GridCacheContext)ctx, - (GridCacheAdapter)delegate, - opCtx != null ? opCtx.keepBinary() : - new CacheOperationContext(false, - false, - true, - null, - false, - null, - false, - null, - null)); + return new GridCacheProxyImpl<>(ctx, delegate, opCtx); } /** {@inheritDoc} */ @@ -1581,18 +1560,12 @@ public IgniteInternalCache delegate() { CacheOperationContext prev = gate.enter(opCtx); try { - return new GridCacheProxyImpl<>(ctx, delegate, - opCtx != null ? opCtx.withExpiryPolicy(plc) : - new CacheOperationContext( - false, - false, - false, - plc, - false, - null, - false, - null, - null)); + if (opCtx != null) + opCtx = opCtx.withExpiryPolicy(plc); + else + opCtx = CacheOperationContext.builder().expiryPolicy(plc).build(); + + return new GridCacheProxyImpl<>(ctx, delegate, opCtx); } finally { gate.leave(prev); @@ -1604,17 +1577,37 @@ public IgniteInternalCache delegate() { CacheOperationContext prev = gate.enter(opCtx); try { - return new GridCacheProxyImpl<>(ctx, delegate, - new CacheOperationContext( - false, - false, - false, - null, - true, - null, - false, - null, - null)); + if (opCtx != null) { + if (opCtx.noRetries()) + return this; + else + opCtx = opCtx.withNoRetries(); + } + else + opCtx = CacheOperationContext.builder().noRetries(true).build(); + + return new GridCacheProxyImpl<>(ctx, delegate, opCtx); + } + finally { + gate.leave(prev); + } + } + + /** {@inheritDoc} */ + @Override public IgniteInternalCache withCalciteEngine() { + CacheOperationContext prev = gate.enter(opCtx); + + try { + if (opCtx != null) { + if (opCtx.calciteEngine()) + return this; + else + opCtx = opCtx.withCalciteEngine(); + } + else + opCtx = CacheOperationContext.builder().calciteEngine(true).build(); + + return new GridCacheProxyImpl<>(ctx, delegate, opCtx); } finally { gate.leave(prev); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java index 629bb9433cbd8..2ab4e3d990a7c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java @@ -272,7 +272,7 @@ public GridCacheContext context0() { /** {@inheritDoc} */ @Override public IgniteCacheProxy cacheNoGate() { - return new GatewayProtectedCacheProxy<>(this, new CacheOperationContext(), false); + return new GatewayProtectedCacheProxy<>(this, CacheOperationContext.instance(), false); } /** @@ -282,7 +282,7 @@ public IgniteCacheProxy gatewayWrapper() { if (cachedProxy != null) return cachedProxy; - cachedProxy = new GatewayProtectedCacheProxy<>(this, new CacheOperationContext(), true); + cachedProxy = new GatewayProtectedCacheProxy<>(this, CacheOperationContext.instance(), true); return cachedProxy; } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java index 50bdf4efe97eb..4b4f8263c7149 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java @@ -222,10 +222,9 @@ public interface IgniteInternalCache extends Iterable> { public boolean skipStore(); /** - * @param skipStore Skip store flag. * @return New internal cache instance based on this one, but with skip store flag enabled. */ - public IgniteInternalCache setSkipStore(boolean skipStore); + public IgniteInternalCache withSkipStore(); /** @return New internal cache instance based on this one, but with skip read-through cache store flag enabled. */ public IgniteInternalCache withSkipReadThrough(); @@ -1642,6 +1641,11 @@ public Iterator> scanIterator(boolean keepBinary, @Nullable Ig */ public IgniteInternalCache withExpiryPolicy(ExpiryPolicy plc); + /** + * @return Cache with calcite engine execution flag. + */ + public IgniteInternalCache withCalciteEngine(); + /** * @return Cache with no-retries behavior enabled. */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockRequest.java index b02d75024fec5..83a1c18ce1afc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockRequest.java @@ -49,6 +49,9 @@ public class GridDistributedLockRequest extends GridDistributedBaseMessage { /** */ private static final int SKIP_READ_THROUGH_FLAG_MASK = 0x08; + /** Calcite engine operation call flag bit mask. */ + private static final int CALCITE_OP_CALL_FLAG_MASK = 0x10; + /** Sender node ID. */ @Order(0) public UUID nodeId; @@ -127,6 +130,8 @@ public GridDistributedLockRequest() { * @param keyCnt Number of keys. * @param txSize Expected transaction size. * @param skipStore Skip store flag. + * @param skipReadThrough Skip read-through cache store flag. + * @param calciteOpCall Calcite engine operation call. */ public GridDistributedLockRequest( int cacheId, @@ -144,6 +149,7 @@ public GridDistributedLockRequest( int txSize, boolean skipStore, boolean skipReadThrough, + boolean calciteOpCall, boolean keepBinary ) { super(lockVer, keyCnt, false); @@ -169,6 +175,7 @@ public GridDistributedLockRequest( skipStore(skipStore); skipReadThrough(skipReadThrough); keepBinary(keepBinary); + calciteOpCall(calciteOpCall); } /** @@ -234,14 +241,14 @@ public boolean returnValue(int idx) { * @param skipStore Skip store flag. */ private void skipStore(boolean skipStore) { - flags = skipStore ? (byte)(flags | SKIP_STORE_FLAG_MASK) : (byte)(flags & ~SKIP_STORE_FLAG_MASK); + setFlag(skipStore, SKIP_STORE_FLAG_MASK); } /** * @return Skip store flag. */ public boolean skipStore() { - return (flags & SKIP_STORE_FLAG_MASK) == 1; + return isFlag(SKIP_STORE_FLAG_MASK); } /** @@ -250,45 +257,74 @@ public boolean skipStore() { * @param skipReadThrough Skip read-through cache store flag. */ private void skipReadThrough(boolean skipReadThrough) { - flags = skipReadThrough ? (byte)(flags | SKIP_READ_THROUGH_FLAG_MASK) : (byte)(flags & ~SKIP_READ_THROUGH_FLAG_MASK); + setFlag(skipReadThrough, SKIP_READ_THROUGH_FLAG_MASK); } /** * @return Skip store flag. */ public boolean skipReadThrough() { - return (flags & SKIP_READ_THROUGH_FLAG_MASK) != 0; + return isFlag(SKIP_READ_THROUGH_FLAG_MASK); + } + + /** Sets calcite operation flag. */ + public void calciteOpCall(boolean calciteOpCall) { + setFlag(calciteOpCall, CALCITE_OP_CALL_FLAG_MASK); + } + + /** + * @return Calcite engine operation flag. + */ + public boolean calciteOpCall() { + return isFlag(CALCITE_OP_CALL_FLAG_MASK); } /** * @param keepBinary Keep binary flag. */ private void keepBinary(boolean keepBinary) { - flags = keepBinary ? (byte)(flags | KEEP_BINARY_FLAG_MASK) : (byte)(flags & ~KEEP_BINARY_FLAG_MASK); + setFlag(keepBinary, KEEP_BINARY_FLAG_MASK); } /** * @return Keep binary. */ public boolean keepBinary() { - return (flags & KEEP_BINARY_FLAG_MASK) != 0; + return isFlag(KEEP_BINARY_FLAG_MASK); } /** * @return Flag indicating whether transaction use cache store. */ public boolean storeUsed() { - return (flags & STORE_USED_FLAG_MASK) != 0; + return isFlag(STORE_USED_FLAG_MASK); } /** * @param storeUsed Store used value. */ public void storeUsed(boolean storeUsed) { - if (storeUsed) - flags |= STORE_USED_FLAG_MASK; - else - flags &= ~STORE_USED_FLAG_MASK; + setFlag(storeUsed, STORE_USED_FLAG_MASK); + } + + /** + * Sets flag mask. + * + * @param flag Set or clear. + * @param mask Mask. + */ + private void setFlag(boolean flag, int mask) { + flags = flag ? (byte)(flags | mask) : (byte)(flags & ~mask); + } + + /** + * Reads flag mask. + * + * @param mask Mask to read. + * @return Flag value. + */ + private boolean isFlag(int mask) { + return (flags & mask) != 0; } /** diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java index bd427bd47b56f..68a53ca1936bc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java @@ -216,7 +216,7 @@ public final boolean system() { * @return Flag indicating whether transaction use cache store. */ public boolean storeWriteThrough() { - return (flags & STORE_WRITE_THROUGH_FLAG_MASK) != 0; + return isFlag(STORE_WRITE_THROUGH_FLAG_MASK); } /** diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java index 7cba44d7ef5a4..8487ef7b44e12 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java @@ -187,6 +187,9 @@ public final class GridDhtLockFuture extends GridCacheCompoundIdentityFuture cctx, @@ -219,6 +224,7 @@ public GridDhtLockFuture( long accessTtl, boolean skipStore, boolean skipReadThrough, + boolean calciteOpCall, boolean keepBinary) { super(CU.boolReducer()); @@ -239,6 +245,7 @@ public GridDhtLockFuture( this.accessTtl = accessTtl; this.skipStore = skipStore; this.skipReadThrough = skipReadThrough; + this.calciteOpCall = calciteOpCall; this.keepBinary = keepBinary; if (tx != null) @@ -922,6 +929,7 @@ private void map(Iterable entries) { read ? accessTtl : -1L, skipStore, skipReadThrough, + calciteOpCall, cctx.store().configured(), keepBinary, inTx() ? tx.label() : null); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java index 315997cecd8a6..09db43379175a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java @@ -111,6 +111,8 @@ public GridDhtLockRequest() { * @param storeUsed Cache store used flag. * @param keepBinary Keep binary flag. * @param txLbl Transaction label. + * @param skipReadThrough Skip read-through cache store flag. + * @param calciteOpCall Calcite engine operation call. */ public GridDhtLockRequest( int cacheId, @@ -132,6 +134,7 @@ public GridDhtLockRequest( long accessTtl, boolean skipStore, boolean skipReadThrough, + boolean calciteOpCall, boolean storeUsed, boolean keepBinary, String txLbl @@ -151,6 +154,7 @@ public GridDhtLockRequest( txSize, skipStore, skipReadThrough, + calciteOpCall, keepBinary); this.topVer = topVer; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java index 8acb9ccb5623d..9bc69ddb02bb9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java @@ -742,6 +742,7 @@ else if (txLockMsgLog.isDebugEnabled()) { accessTtl, opCtx != null && opCtx.skipStore(), opCtx != null && opCtx.skipReadThrough(), + opCtx != null && opCtx.calciteEngine(), opCtx != null && opCtx.isKeepBinary()); } @@ -759,6 +760,7 @@ else if (txLockMsgLog.isDebugEnabled()) { * @param accessTtl TTL for read operation. * @param skipStore Skip store flag. * @param skipReadThrough Skip read-through cache store flag. + * @param calciteOpCall Calcite engine operation call. * @return Lock future. */ public GridDhtFuture lockAllAsyncInternal(@Nullable Collection keys, @@ -772,6 +774,7 @@ public GridDhtFuture lockAllAsyncInternal(@Nullable Collection(true); @@ -795,6 +798,7 @@ public GridDhtFuture lockAllAsyncInternal(@Nullable Collection lockAllAsync( req.accessTtl(), req.skipStore(), req.skipReadThrough(), + req.calciteOpCall(), req.keepBinary()); // Add before mapping. @@ -1051,6 +1056,7 @@ public IgniteInternalFuture lockAllAsync( req.accessTtl(), req.skipStore(), req.skipReadThrough(), + req.calciteOpCall(), req.keepBinary(), req.nearCache()); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java index af3655f3b2ffb..391a12b824ce4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java @@ -561,6 +561,8 @@ private void addMapping( * @param accessTtl TTL for read operation. * @param needRetVal Return value flag. * @param skipStore Skip store flag. + * @param skipReadThrough Skip read-through cache store flag. + * @param calciteOpCall Calcite engine operation call. * @param keepBinary Keep binary flag. * @param nearCache {@code True} if near cache enabled on originating node. * @return Lock future. @@ -576,6 +578,7 @@ IgniteInternalFuture lockAllAsync( long accessTtl, boolean skipStore, boolean skipReadThrough, + boolean calciteOpCall, boolean keepBinary, boolean nearCache ) { @@ -649,6 +652,7 @@ IgniteInternalFuture lockAllAsync( null, skipStore, skipReadThrough, + calciteOpCall, keepBinary, nearCache); @@ -693,6 +697,7 @@ IgniteInternalFuture lockAllAsync( accessTtl, skipStore, skipReadThrough, + calciteOpCall, keepBinary); } catch (IgniteCheckedException e) { @@ -712,6 +717,7 @@ IgniteInternalFuture lockAllAsync( * @param accessTtl TTL for read operation. * @param skipStore Skip store flag. * @param skipReadThrough Skip read-through cache store flag. + * @param calciteOpCall Calcite engine operation call. * @return Future for lock acquisition. */ private IgniteInternalFuture obtainLockAsync( @@ -724,6 +730,7 @@ private IgniteInternalFuture obtainLockAsync( final long accessTtl, boolean skipStore, boolean skipReadThrough, + boolean calciteOpCall, boolean keepBinary) { if (log.isDebugEnabled()) log.debug("Before acquiring transaction lock on keys [keys=" + passedKeys + ']'); @@ -755,6 +762,7 @@ private IgniteInternalFuture obtainLockAsync( accessTtl, skipStore, skipReadThrough, + calciteOpCall, keepBinary); return new GridEmbeddedFuture<>( diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java index 7af5629a66818..f92f548fc3c4f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java @@ -498,7 +498,8 @@ private void onEntriesLocked() { if (retVal) { if (err != null || procRes != null) - ret.addEntryProcessResult(txEntry.context(), key, null, procRes, err, keepBinary); + ret.addEntryProcessResult(txEntry.context(), key, null, procRes, err, + txEntry.calciteOpCall() || keepBinary); else ret.invokeResult(true); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index 0a4ace0489250..b7d8d732b4932 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -288,7 +288,8 @@ else if (res.error() != null) { AtomicApplicationAttributesAwareRequest req ) { if (req.applicationAttributes() != null) - ctx.operationContextPerCall(new CacheOperationContext().setApplicationAttributes(req.applicationAttributes())); + ctx.operationContextPerCall(CacheOperationContext.builder().applicationAttributes(req.applicationAttributes()) + .build()); try { processNearAtomicUpdateRequest(nodeId, req.payload()); @@ -1062,7 +1063,7 @@ private IgniteInternalFuture updateAll0( final CacheOperationContext opCtx = ctx.operationContextPerCall(); - if (opCtx != null && opCtx.hasDataCenterId()) { + if (opCtx != null && opCtx.dataCenterId() != null) { assert conflictPutVals == null : conflictPutVals; assert conflictRmvVals == null : conflictRmvVals; @@ -1111,7 +1112,8 @@ else if (op == GridCacheOperation.DELETE) { opCtx != null && opCtx.isKeepBinary(), opCtx != null && opCtx.recovery(), opCtx != null && opCtx.noRetries() ? 1 : MAX_RETRIES, - opCtx != null ? opCtx.applicationAttributes() : null); + opCtx != null ? opCtx.applicationAttributes() : null, + opCtx != null && opCtx.calciteEngine()); if (async) { return asyncOp(new CO>() { @@ -1254,7 +1256,7 @@ else if (proc != null) { GridCacheDrInfo conflictPutVal = null; GridCacheVersion conflictRmvVer = null; - if (opCtx != null && opCtx.hasDataCenterId()) { + if (opCtx != null && opCtx.dataCenterId() != null) { Byte dcId = opCtx.dataCenterId(); assert dcId != null; @@ -1299,7 +1301,8 @@ else if (op == GridCacheOperation.TRANSFORM) { opCtx != null && opCtx.isKeepBinary(), opCtx != null && opCtx.recovery(), opCtx != null && opCtx.noRetries() ? 1 : MAX_RETRIES, - opCtx != null ? opCtx.applicationAttributes() : null + opCtx != null ? opCtx.applicationAttributes() : null, + opCtx != null && opCtx.calciteEngine() ); } else { @@ -1322,7 +1325,8 @@ else if (op == GridCacheOperation.TRANSFORM) { opCtx != null && opCtx.isKeepBinary(), opCtx != null && opCtx.recovery(), opCtx != null && opCtx.noRetries() ? 1 : MAX_RETRIES, - opCtx != null ? opCtx.applicationAttributes() : null); + opCtx != null ? opCtx.applicationAttributes() : null, + opCtx != null && opCtx.calciteEngine()); } } @@ -1352,7 +1356,7 @@ private IgniteInternalFuture removeAllAsync0( Collection drVers = null; - if (opCtx != null && keys != null && opCtx.hasDataCenterId()) { + if (opCtx != null && keys != null && opCtx.dataCenterId() != null) { assert conflictMap == null : conflictMap; drVers = F.transform(keys, new C1() { @@ -1381,7 +1385,8 @@ private IgniteInternalFuture removeAllAsync0( opCtx != null && opCtx.isKeepBinary(), opCtx != null && opCtx.recovery(), opCtx != null && opCtx.noRetries() ? 1 : MAX_RETRIES, - opCtx != null ? opCtx.applicationAttributes() : null); + opCtx != null ? opCtx.applicationAttributes() : null, + opCtx != null && opCtx.calciteEngine()); if (async) { return asyncOp(new CO>() { @@ -2695,7 +2700,7 @@ else if (GridDhtCacheEntry.ReaderId.contains(readers, nearNode.id())) { null, compRes.get1(), compRes.get2(), - req.keepBinary()); + req.calciteOpCall() || req.keepBinary()); } } else { @@ -3174,7 +3179,8 @@ else if (req.operation() == UPDATE) { req.keepBinary(), req.recovery(), MAX_RETRIES, - opCtx == null ? null : opCtx.applicationAttributes()); + opCtx == null ? null : opCtx.applicationAttributes(), + req.calciteOpCall()); updateFut.map(); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java index a9b194b10e0ac..317b4ab7f1c34 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java @@ -150,6 +150,9 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridCacheFuture /** Operation result. */ protected GridCacheReturn opRes; + /** Calcite operation call. */ + protected boolean calciteOpCall; + /** * Constructor. * @@ -167,6 +170,7 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridCacheFuture * @param recovery {@code True} if cache operation is called in recovery mode. * @param remapCnt Remap count. * @param appAttrs Application attributes. + * @param calciteOpCall Calcite engine call flag. */ protected GridNearAtomicAbstractUpdateFuture( GridCacheContext cctx, @@ -183,7 +187,8 @@ protected GridNearAtomicAbstractUpdateFuture( boolean keepBinary, boolean recovery, int remapCnt, - @Nullable Map appAttrs + @Nullable Map appAttrs, + boolean calciteOpCall ) { if (log == null) { msgLog = cctx.shared().atomicMessageLogger(); @@ -209,6 +214,7 @@ protected GridNearAtomicAbstractUpdateFuture( this.remapCnt = remapCnt; this.appAttrs = appAttrs; + this.calciteOpCall = calciteOpCall; } /** @@ -360,7 +366,7 @@ final void completeFuture(@Nullable GridCacheReturn ret, Throwable err, @Nullabl : (this.retval || op == TRANSFORM) ? cctx.unwrapBinaryIfNeeded( ret.value(), - keepBinary, + calciteOpCall || keepBinary, U.deploymentClassLoader(cctx.kernalContext(), deploymentLdrId)) : ret.success(); @@ -911,7 +917,7 @@ protected abstract class UpdateReplyClosureContextAware implements GridDhtAtomic CacheOperationContext prevOpCtx = cctx.operationContextPerCall(); if (appAttrs != null) - cctx.operationContextPerCall(new CacheOperationContext().setApplicationAttributes(appAttrs)); + cctx.operationContextPerCall(CacheOperationContext.builder().applicationAttributes(appAttrs).build()); try { apply0(req, res); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java index 73a75435a5959..eb5f516b44c5a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java @@ -72,6 +72,9 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheIdMes /** */ private static final int SKIP_READ_THROUGH_FLAG_MASK = 0x100; + /** */ + private static final int CALCITE_OP_FLAG_MASK = 0x200; + /** Target node ID. */ protected UUID nodeId; @@ -152,6 +155,7 @@ protected GridNearAtomicAbstractUpdateRequest( * @param skipStore Skip write-through to a CacheStore flag. * @param keepBinary Keep binary flag. * @param recovery Recovery mode flag. + * @param calciteOpCall Calcite engine operation call. * @return Flags. */ static short flags( @@ -163,7 +167,8 @@ static short flags( boolean skipStore, boolean keepBinary, boolean recovery, - boolean skipReadThrough + boolean skipReadThrough, + boolean calciteOpCall ) { short flags = 0; @@ -194,6 +199,9 @@ static short flags( if (skipReadThrough) flags |= SKIP_READ_THROUGH_FLAG_MASK; + if (calciteOpCall) + flags |= CALCITE_OP_FLAG_MASK; + return flags; } @@ -356,6 +364,11 @@ public final boolean skipReadThrough() { return isFlag(SKIP_READ_THROUGH_FLAG_MASK); } + /** */ + public final boolean calciteOpCall() { + return isFlag(CALCITE_OP_FLAG_MASK); + } + /** * @return Keep binary flag. */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java index e4ea94e5c80eb..1c78893a132eb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java @@ -32,6 +32,7 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheEntryPredicate; import org.apache.ignite.internal.processors.cache.CacheObject; +import org.apache.ignite.internal.processors.cache.CacheOperationContext; import org.apache.ignite.internal.processors.cache.CachePartialUpdateCheckedException; import org.apache.ignite.internal.processors.cache.CacheStoppedException; import org.apache.ignite.internal.processors.cache.EntryProcessorResourceInjectorProxy; @@ -82,6 +83,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda * @param keepBinary Keep binary flag. * @param recovery {@code True} if cache operation is called in recovery mode. * @param remapCnt Maximum number of retries. + * @param calciteOpCall Calcite engine opearation call. */ public GridNearAtomicSingleUpdateFuture( GridCacheContext cctx, @@ -100,7 +102,8 @@ public GridNearAtomicSingleUpdateFuture( boolean keepBinary, boolean recovery, int remapCnt, - @Nullable Map appAttrs + @Nullable Map appAttrs, + boolean calciteOpCall ) { super(cctx, cache, @@ -116,7 +119,8 @@ public GridNearAtomicSingleUpdateFuture( keepBinary, recovery, remapCnt, - appAttrs); + appAttrs, + calciteOpCall); this.key = key; this.val = val; } @@ -545,6 +549,8 @@ private PrimaryRequestState mapSingleUpdate(AffinityTopologyVersion topVer, long GridNearAtomicAbstractUpdateRequest req; + CacheOperationContext opCtx = cctx.operationContextPerCall(); + short flags = GridNearAtomicAbstractUpdateRequest.flags(nearEnabled, topLocked, retval, @@ -553,7 +559,8 @@ private PrimaryRequestState mapSingleUpdate(AffinityTopologyVersion topVer, long skipStore, keepBinary, recovery, - skipReadThrough); + skipReadThrough, + opCtx != null && opCtx.calciteEngine()); if (canUseSingleRequest()) { if (op == TRANSFORM) { diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java index 9a062e60fc748..886950e8788bb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java @@ -36,6 +36,7 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheEntryPredicate; import org.apache.ignite.internal.processors.cache.CacheObject; +import org.apache.ignite.internal.processors.cache.CacheOperationContext; import org.apache.ignite.internal.processors.cache.CachePartialUpdateCheckedException; import org.apache.ignite.internal.processors.cache.CacheStoppedException; import org.apache.ignite.internal.processors.cache.EntryProcessorResourceInjectorProxy; @@ -111,6 +112,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu * @param keepBinary Keep binary flag. * @param remapCnt Maximum number of retries. * @param appAttrs Application attributes. + * @param calciteOpCall Calcite engine call flag. */ public GridNearAtomicUpdateFuture( GridCacheContext cctx, @@ -131,7 +133,8 @@ public GridNearAtomicUpdateFuture( boolean keepBinary, boolean recovery, int remapCnt, - @Nullable Map appAttrs + @Nullable Map appAttrs, + boolean calciteOpCall ) { super( cctx, @@ -148,7 +151,8 @@ public GridNearAtomicUpdateFuture( keepBinary, recovery, remapCnt, - appAttrs); + appAttrs, + calciteOpCall); assert vals == null || vals.size() == keys.size(); assert conflictPutVals == null || conflictPutVals.size() == keys.size(); @@ -992,6 +996,8 @@ else if (conflictRmvVals != null) { PrimaryRequestState mapped = pendingMappings.get(nodeId); + CacheOperationContext opCtx = cctx.operationContextPerCall(); + if (mapped == null) { short flags = GridNearAtomicAbstractUpdateRequest.flags(nearEnabled, topLocked, @@ -1001,7 +1007,8 @@ else if (conflictRmvVals != null) { skipStore, keepBinary, recovery, - skipReadThrough); + skipReadThrough, + opCtx != null && opCtx.calciteEngine()); GridNearAtomicFullUpdateRequest req = new GridNearAtomicFullUpdateRequest( cctx.cacheId(), @@ -1106,6 +1113,8 @@ else if (conflictRmvVals != null) { boolean needPrimaryRes = !mappingKnown || primary.isLocal() || nodes.size() == 1 || nearEnabled; + CacheOperationContext opCtx = cctx.operationContextPerCall(); + short flags = GridNearAtomicAbstractUpdateRequest.flags(nearEnabled, topLocked, retval, @@ -1114,7 +1123,8 @@ else if (conflictRmvVals != null) { skipStore, keepBinary, recovery, - skipReadThrough); + skipReadThrough, + opCtx != null && opCtx.calciteEngine()); GridNearAtomicFullUpdateRequest req = new GridNearAtomicFullUpdateRequest( cctx.cacheId(), diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java index 9109e5b49a579..03f93d95ef979 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java @@ -205,6 +205,7 @@ public GridDistributedCacheEntry entryExx( false, opCtx != null && opCtx.skipStore(), opCtx != null && opCtx.skipReadThrough(), + opCtx != null && opCtx.calciteEngine(), recovery, readRepairStrategy, needVer); @@ -308,6 +309,7 @@ public GridDistributedCacheEntry entryExx( false, opCtx != null && opCtx.skipStore(), opCtx != null && opCtx.skipReadThrough(), + opCtx != null && opCtx.calciteEngine(), recovery, readRepairStrategy, needVer); @@ -661,6 +663,7 @@ else if (!skipVals && ctx.statisticsEnabled()) accessTtl, opCtx != null && opCtx.skipStore(), opCtx != null && opCtx.skipReadThrough(), + opCtx != null && opCtx.calciteEngine(), opCtx != null && opCtx.isKeepBinary(), opCtx != null && opCtx.recovery()); @@ -903,6 +906,7 @@ public void removeLocks(long threadId, GridCacheVersion ver, Collection lockAllAsync( @@ -919,6 +923,7 @@ IgniteInternalFuture lockAllAsync( final long accessTtl, final boolean skipStore, final boolean skipReadThrough, + boolean calciteOpCall, final boolean keepBinary ) { assert keys != null; @@ -944,6 +949,7 @@ IgniteInternalFuture lockAllAsync( accessTtl, skipStore, skipReadThrough, + calciteOpCall, keepBinary); } else { @@ -966,6 +972,7 @@ IgniteInternalFuture lockAllAsync( accessTtl, skipStore, skipReadThrough, + calciteOpCall, keepBinary); } } @@ -986,6 +993,8 @@ IgniteInternalFuture lockAllAsync( * @param createTtl TTL for create operation. * @param accessTtl TTL for read operation. * @param skipStore Skip store flag. + * @param skipReadThrough Skip read-through cache store flag. + * @param calciteOpCall Calcite engine operation call. * @return Lock future. */ private IgniteInternalFuture lockAllAsync0( @@ -1002,6 +1011,7 @@ private IgniteInternalFuture lockAllAsync0( final long accessTtl, boolean skipStore, boolean skipReadThrough, + boolean calciteOpCall, boolean keepBinary) { int cnt = keys.size(); @@ -1020,6 +1030,7 @@ private IgniteInternalFuture lockAllAsync0( accessTtl, skipStore, skipReadThrough, + calciteOpCall, keepBinary); // Add before mapping. @@ -1089,6 +1100,7 @@ else if (!b) accessTtl, skipStore, skipReadThrough, + calciteOpCall, keepBinary); return new GridDhtEmbeddedFuture<>( diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java index 6bf6c13aedc80..775463af60f60 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java @@ -170,6 +170,9 @@ public final class GridDhtColocatedLockFuture extends GridCacheCompoundIdentityF /** Skip read-through cache store flag. */ private final boolean skipReadThrough; + /** Calcite engine operation flag. */ + private final boolean calciteOpCall; + /** */ private Deque mappings; @@ -198,6 +201,8 @@ public final class GridDhtColocatedLockFuture extends GridCacheCompoundIdentityF * @param createTtl TTL for create operation. * @param accessTtl TTL for read operation. * @param skipStore Skip store flag. + * @param skipReadThrough Skip read-through cache store flag. + * @param calciteOpCall Calcite engine operation call. */ public GridDhtColocatedLockFuture( GridCacheContext cctx, @@ -210,6 +215,7 @@ public GridDhtColocatedLockFuture( long accessTtl, boolean skipStore, boolean skipReadThrough, + boolean calciteOpCall, boolean keepBinary, boolean recovery ) { @@ -229,6 +235,7 @@ public GridDhtColocatedLockFuture( this.skipReadThrough = skipReadThrough; this.keepBinary = keepBinary; this.recovery = recovery; + this.calciteOpCall = calciteOpCall; ignoreInterrupts(); @@ -990,8 +997,6 @@ private synchronized void map0( if (log.isDebugEnabled()) log.debug("Starting (re)map for mappings [mappings=" + mappings + ", fut=" + this + ']'); - boolean hasRmtNodes = false; - boolean first = true; // Create mini futures. @@ -1086,6 +1091,7 @@ private synchronized void map0( read ? accessTtl : -1L, skipStore, skipReadThrough, + calciteOpCall, keepBinary, clientFirst, false, @@ -1128,11 +1134,8 @@ private synchronized void map0( } } - if (!distributedKeys.isEmpty()) { + if (!distributedKeys.isEmpty()) mapping.distributedKeys(distributedKeys); - - hasRmtNodes |= !mapping.node().isLocal(); - } else { assert mapping.request() == null; @@ -1259,6 +1262,7 @@ private void lockLocally( accessTtl, skipStore, skipReadThrough, + calciteOpCall, keepBinary); // Add new future. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java index 6a5df13cd5a4a..dd3290ee7b5f0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java @@ -160,6 +160,9 @@ public final class GridNearLockFuture extends GridCacheCompoundIdentityFuture mappings; @@ -185,6 +188,8 @@ public final class GridNearLockFuture extends GridCacheCompoundIdentityFuture cctx, @@ -197,6 +202,7 @@ public GridNearLockFuture( long accessTtl, boolean skipStore, boolean skipReadThrough, + boolean calciteOpCall, boolean keepBinary, boolean recovery ) { @@ -215,6 +221,7 @@ public GridNearLockFuture( this.accessTtl = accessTtl; this.skipStore = skipStore; this.skipReadThrough = skipReadThrough; + this.calciteOpCall = calciteOpCall; this.keepBinary = keepBinary; this.recovery = recovery; @@ -1070,6 +1077,7 @@ private void map(Iterable keys, boolean remap, boolean topLocked read ? accessTtl : -1L, skipStore, skipReadThrough, + calciteOpCall, keepBinary, clientFirst, true, diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java index a6f63d1312253..cfe6f7a4b2d19 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java @@ -130,6 +130,7 @@ public GridNearLockRequest( long accessTtl, boolean skipStore, boolean skipReadThrough, + boolean calciteOpCall, boolean keepBinary, boolean firstClientReq, boolean nearCache, @@ -151,6 +152,7 @@ public GridNearLockRequest( txSize, skipStore, skipReadThrough, + calciteOpCall, keepBinary); assert topVer.compareTo(AffinityTopologyVersion.ZERO) > 0; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java index 72c4f540b7acc..b4cd4cfe2e60e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java @@ -495,7 +495,7 @@ private void proceedPrepare(final Queue mappings) { } /** - * Continues prepare after previous mapping successfully finished. + * Continues to prepare after previous mapping successfully finished. * * @param m Mapping. * @param mappings Queue of mappings. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java index e6b2678065926..0c330dc771173 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java @@ -133,6 +133,7 @@ public void dht(GridDhtCache dht) { final boolean skipStore = opCtx != null && opCtx.skipStore(); final boolean skipReadThrough = opCtx != null && opCtx.skipReadThrough(); + boolean calciteOpCall = opCtx != null && opCtx.calciteEngine(); if (tx != null && !tx.implicit() && !skipTx) { return asyncOp(tx, new AsyncOp>(keys) { @@ -145,6 +146,7 @@ public void dht(GridDhtCache dht) { false, skipStore, skipReadThrough, + calciteOpCall, recovery, readRepairStrategy, needVer); @@ -307,6 +309,7 @@ private void processLockResponse(UUID nodeId, GridNearLockResponse res) { accessTtl, opCtx != null && opCtx.skipStore(), opCtx != null && opCtx.skipReadThrough(), + opCtx != null && opCtx.calciteEngine(), opCtx != null && opCtx.isKeepBinary(), opCtx != null && opCtx.recovery()); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java index 04183f5c45431..381107ae40a46 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java @@ -623,6 +623,7 @@ private IgniteInternalFuture putAsync0( ret, opCtx != null && opCtx.skipStore(), opCtx != null && opCtx.skipReadThrough(), + opCtx != null && opCtx.calciteEngine(), keepBinary, opCtx != null && opCtx.recovery(), dataCenterId); @@ -750,7 +751,7 @@ private IgniteInternalFuture putAllAsync0( final Byte dataCenterId; - if (opCtx != null && opCtx.hasDataCenterId()) { + if (opCtx != null && opCtx.dataCenterId() != null) { assert drMap == null : drMap; dataCenterId = opCtx.dataCenterId(); @@ -799,6 +800,7 @@ private IgniteInternalFuture putAllAsync0( null, opCtx != null && opCtx.skipStore(), opCtx != null && opCtx.skipReadThrough(), + opCtx != null && opCtx.calciteEngine(), false, keepBinary, opCtx != null && opCtx.recovery(), @@ -899,6 +901,7 @@ private IgniteInternalFuture putAllAsync0( * @param ret Return value. * @param skipStore Skip store flag. * @param skipReadThrough Skip read-through cache store flag. + * @param calciteOpCall Calcite engine operation call. * @param recovery Recovery flag. * @param dataCenterId Optional data center Id. * @return Future for entry values loading. @@ -916,6 +919,7 @@ private IgniteInternalFuture enlistWrite( final GridCacheReturn ret, boolean skipStore, boolean skipReadThrough, + boolean calciteOpCall, boolean keepBinary, boolean recovery, Byte dataCenterId) { @@ -954,6 +958,7 @@ private IgniteInternalFuture enlistWrite( /*enlisted*/null, skipStore, skipReadThrough, + calciteOpCall, false, hasFilters, needVal, @@ -1021,6 +1026,7 @@ private IgniteInternalFuture enlistWrite( * @param drRmvMap DR remove map (optional). * @param skipStore Skip store flag. * @param skipReadThrough Skip read-through cache store flag. + * @param calciteOpCall Calcite engine operation call. * @param singleRmv {@code True} for single key remove operation ({@link Cache#remove(Object)}. * @param keepBinary Keep binary flag. * @param recovery Recovery flag. @@ -1043,6 +1049,7 @@ private IgniteInternalFuture enlistWrite( @Nullable Map drRmvMap, boolean skipStore, boolean skipReadThrough, + boolean calciteOpCall, final boolean singleRmv, final boolean keepBinary, final boolean recovery, @@ -1148,6 +1155,7 @@ else if (dataCenterId != null) { enlisted, skipStore, skipReadThrough, + calciteOpCall, singleRmv, hasFilters, needVal, @@ -1220,6 +1228,7 @@ else if (dataCenterId != null) { * @param enlisted Enlisted keys collection. * @param skipStore Skip store flag. * @param skipReadThrough Skip read-through cache store flag. + * @param calciteOpCall Calcite engine operation call. * @param singleRmv {@code True} for single remove operation. * @param hasFilters {@code True} if filters not empty. * @param needVal {@code True} if value is needed. @@ -1243,6 +1252,7 @@ private boolean enlistWriteEntry(GridCacheContext cacheCtx, @Nullable final Collection enlisted, boolean skipStore, boolean skipReadThrough, + boolean calciteOpCall, boolean singleRmv, boolean hasFilters, final boolean needVal, @@ -1363,6 +1373,7 @@ private boolean enlistWriteEntry(GridCacheContext cacheCtx, drVer, skipStore, skipReadThrough, + calciteOpCall, keepBinary, CU.isNearEnabled(cacheCtx)); } @@ -1380,6 +1391,7 @@ private boolean enlistWriteEntry(GridCacheContext cacheCtx, null, skipStore, skipReadThrough, + calciteOpCall, keepBinary, CU.isNearEnabled(cacheCtx)); } @@ -1417,6 +1429,7 @@ private boolean enlistWriteEntry(GridCacheContext cacheCtx, drVer, skipStore, skipReadThrough, + calciteOpCall, keepBinary, CU.isNearEnabled(cacheCtx)); @@ -1537,6 +1550,7 @@ private boolean enlistWriteEntry(GridCacheContext cacheCtx, drVer, skipStore, skipReadThrough, + calciteOpCall, keepBinary, CU.isNearEnabled(cacheCtx)); @@ -1619,7 +1633,7 @@ private IgniteInternalFuture removeAllAsync0( final Byte dataCenterId; - if (opCtx != null && opCtx.hasDataCenterId()) { + if (opCtx != null && opCtx.dataCenterId() != null) { assert drMap == null : drMap; dataCenterId = opCtx.dataCenterId(); @@ -1689,6 +1703,7 @@ private IgniteInternalFuture removeAllAsync0( drMap, opCtx != null && opCtx.skipStore(), opCtx != null && opCtx.skipReadThrough(), + opCtx != null && opCtx.calciteEngine(), singleRmv, keepBinary, opCtx != null && opCtx.recovery(), @@ -1825,6 +1840,7 @@ private IgniteInternalFuture removeAllAsync0( * @param keepCacheObjects Keep cache objects * @param skipStore Skip store flag. * @param skipReadThrough Skip read-through cache store flag. + * @param calciteOpCall Calcite engine operation call. * @param readRepairStrategy Read Repair strategy. * @return Future for this get. */ @@ -1838,6 +1854,7 @@ public IgniteInternalFuture> getAllAsync( final boolean keepCacheObjects, final boolean skipStore, final boolean skipReadThrough, + boolean calciteOpCall, final boolean recovery, final ReadRepairStrategy readRepairStrategy, final boolean needVer) { @@ -1881,6 +1898,7 @@ public IgniteInternalFuture> getAllAsync( keepCacheObjects, skipStore, skipReadThrough, + calciteOpCall, recovery, readRepairStrategy, needVer); @@ -2069,6 +2087,7 @@ public IgniteInternalFuture> getAllAsync( null, skipStore, skipReadThrough, + calciteOpCall, !deserializeBinary, recovery, null); @@ -2203,6 +2222,7 @@ public IgniteInternalFuture> getAllAsync( * @param keepCacheObjects Keep cache objects flag. * @param skipStore Skip store flag. * @param skipReadThrough Skip read-through cache store flag. + * @param calciteOpCall Calcite engine operation call. * @param recovery Recovery flag. * @return Enlisted keys. * @throws IgniteCheckedException If failed. @@ -2221,6 +2241,7 @@ private Collection enlistRead( boolean keepCacheObjects, boolean skipStore, boolean skipReadThrough, + boolean calciteOpCall, boolean recovery, ReadRepairStrategy readRepairStrategy, final boolean needVer @@ -2469,6 +2490,7 @@ private Collection enlistRead( null, skipStore, skipReadThrough, + calciteOpCall, !deserializeBinary, CU.isNearEnabled(cacheCtx)); @@ -4199,6 +4221,7 @@ public IgniteInternalFuture rollbackAsyncLocal() { * @param Key type. * @param skipStore Skip store flag. * @param skipReadThrough Skip read-through cache store flag. + * @param calciteOpCall Calcite engine operation call. * @param keepBinary Keep binary flag. * @return Future with respond. */ @@ -4210,6 +4233,7 @@ public IgniteInternalFuture lockAllAsync(GridCacheContext c long accessTtl, boolean skipStore, boolean skipReadThrough, + boolean calciteOpCall, boolean keepBinary) { assert pessimistic(); @@ -4246,6 +4270,7 @@ public IgniteInternalFuture lockAllAsync(GridCacheContext c accessTtl, skipStore, skipReadThrough, + calciteOpCall, keepBinary); return new GridEmbeddedFuture<>( diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java index c4f024b2d25b0..0dc68edec2509 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java @@ -93,6 +93,9 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { /** Skip read-through cache store flag bit mask. */ private static final int TX_ENTRY_SKIP_READ_THROUGH_FLAG_MASK = 1 << 5; + /** Calcite engine operation call flag bit mask. */ + private static final int CALCITE_OP_CALL_FLAG_MASK = 1 << 6; + /** Prepared flag updater. */ private static final AtomicIntegerFieldUpdater PREPARED_UPD = AtomicIntegerFieldUpdater.newUpdater(IgniteTxEntry.class, "prepared"); @@ -280,6 +283,7 @@ public IgniteTxEntry(GridCacheContext ctx, * @param conflictVer Data center replication version. * @param skipStore Skip store flag. * @param skipReadThrough Skip read-through cache store flag. + * @param calciteOpCall Calcite engine operation call. * @param addReader Add reader flag. */ public IgniteTxEntry(GridCacheContext ctx, @@ -294,6 +298,7 @@ public IgniteTxEntry(GridCacheContext ctx, GridCacheVersion conflictVer, boolean skipStore, boolean skipReadThrough, + boolean calciteOpCall, boolean keepBinary, boolean addReader ) { @@ -312,6 +317,7 @@ public IgniteTxEntry(GridCacheContext ctx, skipStore(skipStore); skipReadThrough(skipReadThrough); + calciteOpCall(calciteOpCall); keepBinary(keepBinary); addReader(addReader); @@ -618,12 +624,24 @@ public void skipReadThrough(boolean skipReadThrough) { } /** - * @return Skip store flag. + * @return Skip read through flag. */ public boolean skipReadThrough() { return isFlag(TX_ENTRY_SKIP_READ_THROUGH_FLAG_MASK); } + /** Sets calcite operation flag. */ + public void calciteOpCall(boolean calciteOpCall) { + setFlag(calciteOpCall, CALCITE_OP_CALL_FLAG_MASK); + } + + /** + * @return Calcite engine operation flag. + */ + public boolean calciteOpCall() { + return isFlag(CALCITE_OP_CALL_FLAG_MASK); + } + /** * @param oldValOnPrimary {@code True} If old value for was non null on primary node. */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java index bf2ea8938c245..7e7b7ca5cced4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java @@ -1338,6 +1338,8 @@ protected void checkValid(boolean checkTimeout) throws IgniteCheckedException { * @param drExpireTime DR expire time (if any). * @param drVer DR version. * @param skipStore Skip store flag. + * @param skipReadThrough Skip read-through cache store flag. + * @param calciteOpCall Calcite engine operation call. * @return Transaction entry. */ public final IgniteTxEntry addEntry(GridCacheOperation op, @@ -1353,6 +1355,7 @@ public final IgniteTxEntry addEntry(GridCacheOperation op, @Nullable GridCacheVersion drVer, boolean skipStore, boolean skipReadThrough, + boolean calciteOpCall, boolean keepBinary, boolean addReader ) { @@ -1426,6 +1429,7 @@ public final IgniteTxEntry addEntry(GridCacheOperation op, drVer, skipStore, skipReadThrough, + calciteOpCall, keepBinary, addReader); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java index fd144b035c639..3f1456e2155e1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueAdapter.java @@ -433,17 +433,10 @@ protected GridCacheQueueAdapter(String queueName, GridCacheQueueHeader hdr, Grid if (opCtx != null && opCtx.isKeepBinary()) return (GridCacheQueueAdapter)this; - opCtx = opCtx == null ? new CacheOperationContext( - false, - false, - true, - null, - false, - null, - false, - null, - null) - : opCtx.keepBinary(); + if (opCtx == null) + opCtx = CacheOperationContext.builder().keepBinary(true).build(); + else + opCtx = opCtx.withKeepBinary(); cctx.operationContextPerCall(opCtx); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheCommandHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheCommandHandler.java index 669aa52dda523..8a50386849e33 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheCommandHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheCommandHandler.java @@ -751,8 +751,10 @@ private IgniteInternalFuture executeCommand( destId == null || destId.equals(ctx.localNodeId()) || replicatedCacheAvailable(cacheName); if (locExec) { - IgniteInternalCache prj = localCache(cacheName) - .setSkipStore(cacheFlags.contains(SKIP_STORE)); + IgniteInternalCache prj = localCache(cacheName); + + if (cacheFlags.contains(SKIP_STORE)) + prj = prj.withSkipStore(); if (cacheFlags.contains(KEEP_BINARIES)) prj = prj.keepBinary(); @@ -919,8 +921,10 @@ private FlaggedCacheOperationCallable(String cacheName, /** {@inheritDoc} */ @Override public GridRestResponse call() throws Exception { - IgniteInternalCache prj = cache(g, cacheName) - .setSkipStore(cacheFlags.contains(SKIP_STORE)); + IgniteInternalCache prj = cache(g, cacheName); + + if (cacheFlags.contains(SKIP_STORE)) + prj = prj.withSkipStore(); if (cacheFlags.contains(KEEP_BINARIES)) prj = prj.keepBinary(); diff --git a/modules/core/src/test/java/org/apache/ignite/util/TestStorageUtils.java b/modules/core/src/test/java/org/apache/ignite/util/TestStorageUtils.java index 9c696e81355c0..85da23aa1ae7c 100644 --- a/modules/core/src/test/java/org/apache/ignite/util/TestStorageUtils.java +++ b/modules/core/src/test/java/org/apache/ignite/util/TestStorageUtils.java @@ -29,6 +29,9 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition; import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; +import org.jetbrains.annotations.Nullable; + +import static org.junit.Assert.assertNotNull; /** * Test methods for storage manipulation. diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/DmlUtils.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/DmlUtils.java index 95122b2e42936..08b7caf38871e 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/DmlUtils.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/DmlUtils.java @@ -555,9 +555,9 @@ public static CacheOperationContext setKeepBinaryContext(GridCacheContext if (opCtx == null) // Mimics behavior of GridCacheAdapter#keepBinary and GridCacheProxyImpl#keepBinary - newOpCtx = new CacheOperationContext(false, false, true, null, false, null, false, null, null); + newOpCtx = CacheOperationContext.builder().keepBinary(true).build(); else if (!opCtx.isKeepBinary()) - newOpCtx = opCtx.keepBinary(); + newOpCtx = opCtx.withKeepBinary(); if (newOpCtx != null) cctx.operationContextPerCall(newOpCtx);