diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ModifyNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ModifyNode.java index 76220c84251fe..fcf2b8ebd3315 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ModifyNode.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ModifyNode.java @@ -29,6 +29,8 @@ import org.apache.calcite.rel.core.TableModify; import org.apache.calcite.rel.type.RelDataType; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException; +import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.cache.context.SessionContextImpl; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.IgniteInternalCache; @@ -50,6 +52,12 @@ * */ public class ModifyNode extends AbstractNode implements SingleNode, Downstream { + /** + * Timeout to wait for async invoke operation to complete. In case this timeout exceeded, release thread and + * continue execution after invoke operation finished. + */ + private static final long INVOKE_TIMEOUT = 100; + /** */ protected final CacheTableDescriptor desc; @@ -77,6 +85,9 @@ public class ModifyNode extends AbstractNode implements SingleNode>> invokeFut; + /** * @param ctx Execution context. * @param desc Table descriptor. @@ -119,22 +130,23 @@ public ModifyNode( waiting--; - switch (op) { - case DELETE: - case UPDATE: - case INSERT: - case MERGE: - tuples.add(desc.toTuple(context(), row, op, cols)); + tuples.add(desc.toTuple(context(), row, op, cols)); - flushTuples(false); + if (invokeFut != null) // Still waiting for previous invocation result. + return; - break; - default: - throw new UnsupportedOperationException(op.name()); - } + flushTuples(false); - if (waiting == 0) - source().request(waiting = MODIFY_BATCH_SIZE); + if (invokeFut != null) { + invokeFut.listen(f -> { + // Push new task to execute in correct thread. + context().execute(() -> { + processInvokeResult(f.get()); + + flushTuples(false); + }, this::onError); + }); + } } /** {@inheritDoc} */ @@ -171,8 +183,27 @@ private void tryEnd() throws Exception { source().request(waiting = MODIFY_BATCH_SIZE); if (state == State.UPDATED && requested > 0) { + if (invokeFut != null) { // Still waiting for previous invocation result. + invokeFut.listen(f -> context().execute(this::tryEnd, this::onError)); + + return; + } + flushTuples(true); + if (invokeFut != null) { + invokeFut.listen(f -> { + // Push new task to execute in correct thread. + context().execute(() -> { + processInvokeResult(f.get()); + + tryEnd(); + }, this::onError); + }); + + return; + } + state = State.END; inLoop = true; @@ -193,7 +224,7 @@ private void tryEnd() throws Exception { /** */ @SuppressWarnings("unchecked") - private void flushTuples(boolean force) throws IgniteCheckedException { + private void flushTuples(boolean force) throws Exception { if (F.isEmpty(tuples) || !force && tuples.size() < MODIFY_BATCH_SIZE) return; @@ -225,7 +256,7 @@ private void flushTuples(boolean force) throws IgniteCheckedException { private void invokeOutsideTransaction( List tuples, IgniteInternalCache cache - ) throws IgniteCheckedException { + ) throws Exception { SessionContextImpl sesCtx = context().unwrap(SessionContextImpl.class); Map sesAttrs = sesCtx == null ? null : sesCtx.attributes(); @@ -233,7 +264,22 @@ private void invokeOutsideTransaction( cache = cache.withApplicationAttributes(sesAttrs); Map> map = invokeMap(tuples); - Map> res = cacheForDML(cache).invokeAll(map); + invokeFut = cacheForDML(cache).invokeAllAsync(map); + + try { + // Shortcut - give a chance for operation to be executed in sync mode (it will simplify workflow). + Map> res = invokeFut.get(INVOKE_TIMEOUT); + + processInvokeResult(res); + } + catch (IgniteFutureTimeoutCheckedException ignore) { + // No-op. Result processing task will be scheduled by caller if invokeFut != null. + } + } + + /** */ + private void processInvokeResult(Map> res) throws Exception { + invokeFut = null; long updated = res.values().stream().mapToLong(EntryProcessorResult::get).sum(); @@ -247,6 +293,9 @@ private void invokeOutsideTransaction( } updatedRows += updated; + + if (waiting == 0) + source().request(waiting = MODIFY_BATCH_SIZE); } /** @@ -260,7 +309,7 @@ private void invokeInsideTransaction( List tuples, IgniteInternalCache cache, GridNearTxLocal userTx - ) throws IgniteCheckedException { + ) throws Exception { userTx.resume(); try { diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractExecutionTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractExecutionTest.java index 6cf79d4f5dfa3..f6c3d74f379ba 100644 --- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractExecutionTest.java +++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractExecutionTest.java @@ -88,7 +88,10 @@ public class AbstractExecutionTest extends GridCommonAbstractTest { protected static final String PARAMS_STRING = "Task executor = {0}, Execution strategy = {1}"; /** */ - protected static final int IN_BUFFER_SIZE = AbstractNode.IN_BUFFER_SIZE; + public static final int IN_BUFFER_SIZE = AbstractNode.IN_BUFFER_SIZE; + + /** */ + public static final int MODIFY_BATCH_SIZE = AbstractNode.MODIFY_BATCH_SIZE; /** */ private Throwable lastE; diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/tx/TxThreadLockingTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/tx/TxThreadLockingTest.java new file mode 100644 index 0000000000000..6c451ac68db15 --- /dev/null +++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/tx/TxThreadLockingTest.java @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.tx; + +import java.util.Collection; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.processors.query.IgniteSQLException; +import org.apache.ignite.internal.processors.query.calcite.exec.rel.AbstractExecutionTest; +import org.apache.ignite.internal.processors.query.calcite.integration.AbstractBasicIntegrationTest; +import org.apache.ignite.internal.util.typedef.X; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.transactions.Transaction; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import static org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessor.IGNITE_CALCITE_USE_QUERY_BLOCKING_TASK_EXECUTOR; +import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; +import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED; + +/** */ +@RunWith(Parameterized.class) +public class TxThreadLockingTest extends AbstractBasicIntegrationTest { + /** */ + private static final long TIMEOUT = 10_000L; + + /** */ + private static final int POOL_SIZE = 10; + + /** */ + private static final int IN_BUFFER_SIZE = AbstractExecutionTest.IN_BUFFER_SIZE; + + /** */ + private static final int MODIFY_BATCH_SIZE = AbstractExecutionTest.MODIFY_BATCH_SIZE; + + /** Use query blocking executor. */ + @Parameterized.Parameter(0) + public boolean qryBlockingExecutor; + + /** */ + @Parameterized.Parameters(name = "qryBlockingExecutor={0}") + public static Collection parameters() { + return List.of(false, true); + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + // No-op. + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + System.setProperty(IGNITE_CALCITE_USE_QUERY_BLOCKING_TASK_EXECUTOR, String.valueOf(qryBlockingExecutor)); + + startGrid(0); + + client = startClientGrid(); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + + System.clearProperty(IGNITE_CALCITE_USE_QUERY_BLOCKING_TASK_EXECUTOR); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + return super.getConfiguration(igniteInstanceName).setQueryThreadPoolSize(POOL_SIZE); + } + + /** {@inheritDoc} */ + @Override protected CacheConfiguration cacheConfiguration() { + return super.cacheConfiguration().setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL); + } + + /** */ + @Test + public void testThreadsLockingByDml() throws Exception { + createAndPopulateTable(); + + CountDownLatch unlockLatch = new CountDownLatch(1); + + IgniteInternalFuture lockKeyFut = lockKeyAndWaitForLatch(0, unlockLatch); + + IgniteInternalFuture updFut = GridTestUtils.runMultiThreadedAsync(() -> { + sql("UPDATE person SET salary = 1 WHERE id = 0"); + }, POOL_SIZE + 1, "update-thread"); + + // Ensure update queries are blocked. + assertFalse(GridTestUtils.waitForCondition(updFut::isDone, 200)); + + IgniteInternalFuture selectFut = GridTestUtils.runMultiThreadedAsync(() -> { + for (int i = 0; i < 100; i++) + sql("SELECT * FROM person WHERE id = 1"); + }, POOL_SIZE + 1, "select-thread"); + + // Ensure other queries are not blocked by update queries. + selectFut.get(TIMEOUT, TimeUnit.MILLISECONDS); + + unlockLatch.countDown(); + + lockKeyFut.get(); + + // Ensure update queries are released. + updFut.get(TIMEOUT, TimeUnit.MILLISECONDS); + } + + /** */ + @Test + public void testDifferentBlockedBatchSize() throws Exception { + assertTrue("Unexpected constants [MODIFY_BATCH_SIZE=" + MODIFY_BATCH_SIZE + + ", IN_BUFFER_SIZE=" + IN_BUFFER_SIZE + ']', MODIFY_BATCH_SIZE < IN_BUFFER_SIZE); + + createAndPopulateTable(); + + IgniteCache cache = client.cache(TABLE_NAME); + + for (int i = 0; i < IN_BUFFER_SIZE + 1; i++) + cache.put(i, new Employer("Test" + i, (double)i)); + + for (int size : new int[] {MODIFY_BATCH_SIZE, MODIFY_BATCH_SIZE + 1, IN_BUFFER_SIZE, IN_BUFFER_SIZE + 1}) { + log.info("Blocked batch size: " + size); + + CountDownLatch unlockLatch = new CountDownLatch(1); + + IgniteInternalFuture lockKeyFut = lockKeyAndWaitForLatch(0, unlockLatch); + + IgniteInternalFuture>> updFut = GridTestUtils.runAsync(() -> + sql("UPDATE person SET salary = 1 WHERE id < ?", size)); + + // Ensure update query is blocked. + assertFalse(GridTestUtils.waitForCondition(updFut::isDone, 200)); + + unlockLatch.countDown(); + + lockKeyFut.get(); + + List> res = updFut.get(TIMEOUT, TimeUnit.MILLISECONDS); + + assertEquals((long)size, res.get(0).get(0)); + } + } + + /** */ + @Test + public void testErrorAfterAsyncWait() throws Exception { + createAndPopulateTable(); + + CountDownLatch unlockLatch = new CountDownLatch(1); + + IgniteInternalFuture lockKeyFut = lockKeyAndWaitForLatch(0, unlockLatch); + + IgniteInternalFuture>> insFut = GridTestUtils.runAsync(() -> + sql("INSERT INTO person (id, name, salary) VALUES (0, 'Test', 0)")); + + // Ensure insert query is blocked. + assertFalse(GridTestUtils.waitForCondition(insFut::isDone, 200)); + + unlockLatch.countDown(); + + lockKeyFut.get(); + + // Ensure exception is propagated to query initiator after async batch execution finished. + try { + insFut.get(TIMEOUT, TimeUnit.MILLISECONDS); + + fail("Exception wasn't thrown"); + } + catch (IgniteCheckedException expected) { + assertTrue(X.hasCause(expected, "Failed to INSERT some keys because they are already in cache", + IgniteSQLException.class)); + } + } + + /** */ + private IgniteInternalFuture lockKeyAndWaitForLatch(int key, CountDownLatch unlockLatch) throws Exception { + CountDownLatch lockLatch = new CountDownLatch(1); + + IgniteInternalFuture lockKeyFut = GridTestUtils.runAsync(() -> { + try (Transaction tx = client.transactions().txStart(PESSIMISTIC, READ_COMMITTED)) { + client.cache(TABLE_NAME).put(key, new Employer("Test", 0d)); + + lockLatch.countDown(); + + assertTrue(unlockLatch.await(TIMEOUT, TimeUnit.MILLISECONDS)); + + tx.commit(); + } + }); + + assertTrue(lockLatch.await(TIMEOUT, TimeUnit.MILLISECONDS)); + + return lockKeyFut; + } +} diff --git a/modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java b/modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java index af08a923f511a..f4e123c6a7c40 100644 --- a/modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java +++ b/modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java @@ -94,6 +94,7 @@ import org.apache.ignite.internal.processors.query.calcite.rules.OrToUnionRuleTest; import org.apache.ignite.internal.processors.query.calcite.rules.ProjectScanMergeRuleTest; import org.apache.ignite.internal.processors.query.calcite.thin.MultiLineQueryTest; +import org.apache.ignite.internal.processors.tx.TxThreadLockingTest; import org.apache.ignite.internal.processors.tx.TxWithExceptionalInterceptorTest; import org.junit.runner.RunWith; import org.junit.runners.Suite; @@ -180,7 +181,7 @@ TxWithExceptionalInterceptorTest.class, UserDefinedTxAwareFunctionsIntegrationTest.class, CacheWithInterceptorIntegrationTest.class, - TxWithExceptionalInterceptorTest.class, + TxThreadLockingTest.class, SelectByKeyFieldTest.class, }) public class IntegrationTestSuite {