Skip to content

Commit 45c02d6

Browse files
IGNITE-28568 SQL Calcite: Fix sync tx key waiting for DML operations - Fixes #13042.
Signed-off-by: Aleksey Plekhanov <plehanov.alex@gmail.com>
1 parent 7a27bfe commit 45c02d6

4 files changed

Lines changed: 291 additions & 19 deletions

File tree

modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ModifyNode.java

Lines changed: 66 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@
2929
import org.apache.calcite.rel.core.TableModify;
3030
import org.apache.calcite.rel.type.RelDataType;
3131
import org.apache.ignite.IgniteCheckedException;
32+
import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
33+
import org.apache.ignite.internal.IgniteInternalFuture;
3234
import org.apache.ignite.internal.cache.context.SessionContextImpl;
3335
import org.apache.ignite.internal.processors.cache.GridCacheContext;
3436
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
@@ -50,6 +52,12 @@
5052
*
5153
*/
5254
public class ModifyNode<Row> extends AbstractNode<Row> implements SingleNode<Row>, Downstream<Row> {
55+
/**
56+
* Timeout to wait for async invoke operation to complete. In case this timeout exceeded, release thread and
57+
* continue execution after invoke operation finished.
58+
*/
59+
private static final long INVOKE_TIMEOUT = 100;
60+
5361
/** */
5462
protected final CacheTableDescriptor desc;
5563

@@ -77,6 +85,9 @@ public class ModifyNode<Row> extends AbstractNode<Row> implements SingleNode<Row
7785
/** */
7886
private State state = State.UPDATING;
7987

88+
/** */
89+
private IgniteInternalFuture<Map<Object, EntryProcessorResult<Long>>> invokeFut;
90+
8091
/**
8192
* @param ctx Execution context.
8293
* @param desc Table descriptor.
@@ -119,22 +130,23 @@ public ModifyNode(
119130

120131
waiting--;
121132

122-
switch (op) {
123-
case DELETE:
124-
case UPDATE:
125-
case INSERT:
126-
case MERGE:
127-
tuples.add(desc.toTuple(context(), row, op, cols));
133+
tuples.add(desc.toTuple(context(), row, op, cols));
128134

129-
flushTuples(false);
135+
if (invokeFut != null) // Still waiting for previous invocation result.
136+
return;
130137

131-
break;
132-
default:
133-
throw new UnsupportedOperationException(op.name());
134-
}
138+
flushTuples(false);
135139

136-
if (waiting == 0)
137-
source().request(waiting = MODIFY_BATCH_SIZE);
140+
if (invokeFut != null) {
141+
invokeFut.listen(f -> {
142+
// Push new task to execute in correct thread.
143+
context().execute(() -> {
144+
processInvokeResult(f.get());
145+
146+
flushTuples(false);
147+
}, this::onError);
148+
});
149+
}
138150
}
139151

140152
/** {@inheritDoc} */
@@ -171,8 +183,27 @@ private void tryEnd() throws Exception {
171183
source().request(waiting = MODIFY_BATCH_SIZE);
172184

173185
if (state == State.UPDATED && requested > 0) {
186+
if (invokeFut != null) { // Still waiting for previous invocation result.
187+
invokeFut.listen(f -> context().execute(this::tryEnd, this::onError));
188+
189+
return;
190+
}
191+
174192
flushTuples(true);
175193

194+
if (invokeFut != null) {
195+
invokeFut.listen(f -> {
196+
// Push new task to execute in correct thread.
197+
context().execute(() -> {
198+
processInvokeResult(f.get());
199+
200+
tryEnd();
201+
}, this::onError);
202+
});
203+
204+
return;
205+
}
206+
176207
state = State.END;
177208

178209
inLoop = true;
@@ -193,7 +224,7 @@ private void tryEnd() throws Exception {
193224

194225
/** */
195226
@SuppressWarnings("unchecked")
196-
private void flushTuples(boolean force) throws IgniteCheckedException {
227+
private void flushTuples(boolean force) throws Exception {
197228
if (F.isEmpty(tuples) || !force && tuples.size() < MODIFY_BATCH_SIZE)
198229
return;
199230

@@ -225,15 +256,30 @@ private void flushTuples(boolean force) throws IgniteCheckedException {
225256
private void invokeOutsideTransaction(
226257
List<ModifyTuple> tuples,
227258
IgniteInternalCache<Object, Object> cache
228-
) throws IgniteCheckedException {
259+
) throws Exception {
229260
SessionContextImpl sesCtx = context().unwrap(SessionContextImpl.class);
230261
Map<String, String> sesAttrs = sesCtx == null ? null : sesCtx.attributes();
231262

232263
if (sesAttrs != null)
233264
cache = cache.withApplicationAttributes(sesAttrs);
234265

235266
Map<Object, EntryProcessor<Object, Object, Long>> map = invokeMap(tuples);
236-
Map<Object, EntryProcessorResult<Long>> res = cacheForDML(cache).invokeAll(map);
267+
invokeFut = cacheForDML(cache).invokeAllAsync(map);
268+
269+
try {
270+
// Shortcut - give a chance for operation to be executed in sync mode (it will simplify workflow).
271+
Map<Object, EntryProcessorResult<Long>> res = invokeFut.get(INVOKE_TIMEOUT);
272+
273+
processInvokeResult(res);
274+
}
275+
catch (IgniteFutureTimeoutCheckedException ignore) {
276+
// No-op. Result processing task will be scheduled by caller if invokeFut != null.
277+
}
278+
}
279+
280+
/** */
281+
private void processInvokeResult(Map<Object, EntryProcessorResult<Long>> res) throws Exception {
282+
invokeFut = null;
237283

238284
long updated = res.values().stream().mapToLong(EntryProcessorResult::get).sum();
239285

@@ -247,6 +293,9 @@ private void invokeOutsideTransaction(
247293
}
248294

249295
updatedRows += updated;
296+
297+
if (waiting == 0)
298+
source().request(waiting = MODIFY_BATCH_SIZE);
250299
}
251300

252301
/**
@@ -260,7 +309,7 @@ private void invokeInsideTransaction(
260309
List<ModifyTuple> tuples,
261310
IgniteInternalCache<Object, Object> cache,
262311
GridNearTxLocal userTx
263-
) throws IgniteCheckedException {
312+
) throws Exception {
264313
userTx.resume();
265314

266315
try {

modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractExecutionTest.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,10 @@ public class AbstractExecutionTest extends GridCommonAbstractTest {
8787
protected static final String PARAMS_STRING = "Task executor = {0}, Execution strategy = {1}";
8888

8989
/** */
90-
protected static final int IN_BUFFER_SIZE = AbstractNode.IN_BUFFER_SIZE;
90+
public static final int IN_BUFFER_SIZE = AbstractNode.IN_BUFFER_SIZE;
91+
92+
/** */
93+
public static final int MODIFY_BATCH_SIZE = AbstractNode.MODIFY_BATCH_SIZE;
9194

9295
/** */
9396
private Throwable lastE;
Lines changed: 219 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,219 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.ignite.internal.processors.tx;
19+
20+
import java.util.Collection;
21+
import java.util.List;
22+
import java.util.concurrent.CountDownLatch;
23+
import java.util.concurrent.TimeUnit;
24+
import org.apache.ignite.IgniteCache;
25+
import org.apache.ignite.IgniteCheckedException;
26+
import org.apache.ignite.cache.CacheAtomicityMode;
27+
import org.apache.ignite.configuration.CacheConfiguration;
28+
import org.apache.ignite.configuration.IgniteConfiguration;
29+
import org.apache.ignite.internal.IgniteInternalFuture;
30+
import org.apache.ignite.internal.processors.query.IgniteSQLException;
31+
import org.apache.ignite.internal.processors.query.calcite.exec.rel.AbstractExecutionTest;
32+
import org.apache.ignite.internal.processors.query.calcite.integration.AbstractBasicIntegrationTest;
33+
import org.apache.ignite.internal.util.typedef.X;
34+
import org.apache.ignite.testframework.GridTestUtils;
35+
import org.apache.ignite.transactions.Transaction;
36+
import org.junit.Test;
37+
import org.junit.runner.RunWith;
38+
import org.junit.runners.Parameterized;
39+
40+
import static org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessor.IGNITE_CALCITE_USE_QUERY_BLOCKING_TASK_EXECUTOR;
41+
import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
42+
import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED;
43+
44+
/** */
45+
@RunWith(Parameterized.class)
46+
public class TxThreadLockingTest extends AbstractBasicIntegrationTest {
47+
/** */
48+
private static final long TIMEOUT = 10_000L;
49+
50+
/** */
51+
private static final int POOL_SIZE = 10;
52+
53+
/** */
54+
private static final int IN_BUFFER_SIZE = AbstractExecutionTest.IN_BUFFER_SIZE;
55+
56+
/** */
57+
private static final int MODIFY_BATCH_SIZE = AbstractExecutionTest.MODIFY_BATCH_SIZE;
58+
59+
/** Use query blocking executor. */
60+
@Parameterized.Parameter(0)
61+
public boolean qryBlockingExecutor;
62+
63+
/** */
64+
@Parameterized.Parameters(name = "qryBlockingExecutor={0}")
65+
public static Collection<?> parameters() {
66+
return List.of(false, true);
67+
}
68+
69+
/** {@inheritDoc} */
70+
@Override protected void beforeTestsStarted() throws Exception {
71+
// No-op.
72+
}
73+
74+
/** {@inheritDoc} */
75+
@Override protected void beforeTest() throws Exception {
76+
System.setProperty(IGNITE_CALCITE_USE_QUERY_BLOCKING_TASK_EXECUTOR, String.valueOf(qryBlockingExecutor));
77+
78+
startGrid(0);
79+
80+
client = startClientGrid();
81+
}
82+
83+
/** {@inheritDoc} */
84+
@Override protected void afterTest() throws Exception {
85+
stopAllGrids();
86+
87+
System.clearProperty(IGNITE_CALCITE_USE_QUERY_BLOCKING_TASK_EXECUTOR);
88+
}
89+
90+
/** {@inheritDoc} */
91+
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
92+
return super.getConfiguration(igniteInstanceName).setQueryThreadPoolSize(POOL_SIZE);
93+
}
94+
95+
/** {@inheritDoc} */
96+
@Override protected <K, V> CacheConfiguration<K, V> cacheConfiguration() {
97+
return super.<K, V>cacheConfiguration().setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
98+
}
99+
100+
/** */
101+
@Test
102+
public void testThreadsLockingByDml() throws Exception {
103+
createAndPopulateTable();
104+
105+
CountDownLatch unlockLatch = new CountDownLatch(1);
106+
107+
IgniteInternalFuture<?> lockKeyFut = lockKeyAndWaitForLatch(0, unlockLatch);
108+
109+
IgniteInternalFuture<?> updFut = GridTestUtils.runMultiThreadedAsync(() -> {
110+
sql("UPDATE person SET salary = 1 WHERE id = 0");
111+
}, POOL_SIZE + 1, "update-thread");
112+
113+
// Ensure update queries are blocked.
114+
assertFalse(GridTestUtils.waitForCondition(updFut::isDone, 200));
115+
116+
IgniteInternalFuture<?> selectFut = GridTestUtils.runMultiThreadedAsync(() -> {
117+
for (int i = 0; i < 100; i++)
118+
sql("SELECT * FROM person WHERE id = 1");
119+
}, POOL_SIZE + 1, "select-thread");
120+
121+
// Ensure other queries are not blocked by update queries.
122+
selectFut.get(TIMEOUT, TimeUnit.MILLISECONDS);
123+
124+
unlockLatch.countDown();
125+
126+
lockKeyFut.get();
127+
128+
// Ensure update queries are released.
129+
updFut.get(TIMEOUT, TimeUnit.MILLISECONDS);
130+
}
131+
132+
/** */
133+
@Test
134+
public void testDifferentBlockedBatchSize() throws Exception {
135+
assertTrue("Unexpected constants [MODIFY_BATCH_SIZE=" + MODIFY_BATCH_SIZE +
136+
", IN_BUFFER_SIZE=" + IN_BUFFER_SIZE + ']', MODIFY_BATCH_SIZE < IN_BUFFER_SIZE);
137+
138+
createAndPopulateTable();
139+
140+
IgniteCache<Integer, Employer> cache = client.cache(TABLE_NAME);
141+
142+
for (int i = 0; i < IN_BUFFER_SIZE + 1; i++)
143+
cache.put(i, new Employer("Test" + i, (double)i));
144+
145+
for (int size : new int[] {MODIFY_BATCH_SIZE, MODIFY_BATCH_SIZE + 1, IN_BUFFER_SIZE, IN_BUFFER_SIZE + 1}) {
146+
log.info("Blocked batch size: " + size);
147+
148+
CountDownLatch unlockLatch = new CountDownLatch(1);
149+
150+
IgniteInternalFuture<?> lockKeyFut = lockKeyAndWaitForLatch(0, unlockLatch);
151+
152+
IgniteInternalFuture<List<List<?>>> updFut = GridTestUtils.runAsync(() ->
153+
sql("UPDATE person SET salary = 1 WHERE id < ?", size));
154+
155+
// Ensure update query is blocked.
156+
assertFalse(GridTestUtils.waitForCondition(updFut::isDone, 200));
157+
158+
unlockLatch.countDown();
159+
160+
lockKeyFut.get();
161+
162+
List<List<?>> res = updFut.get(TIMEOUT, TimeUnit.MILLISECONDS);
163+
164+
assertEquals((long)size, res.get(0).get(0));
165+
}
166+
}
167+
168+
/** */
169+
@Test
170+
public void testErrorAfterAsyncWait() throws Exception {
171+
createAndPopulateTable();
172+
173+
CountDownLatch unlockLatch = new CountDownLatch(1);
174+
175+
IgniteInternalFuture<?> lockKeyFut = lockKeyAndWaitForLatch(0, unlockLatch);
176+
177+
IgniteInternalFuture<List<List<?>>> insFut = GridTestUtils.runAsync(() ->
178+
sql("INSERT INTO person (id, name, salary) VALUES (0, 'Test', 0)"));
179+
180+
// Ensure insert query is blocked.
181+
assertFalse(GridTestUtils.waitForCondition(insFut::isDone, 200));
182+
183+
unlockLatch.countDown();
184+
185+
lockKeyFut.get();
186+
187+
// Ensure exception is propagated to query initiator after async batch execution finished.
188+
try {
189+
insFut.get(TIMEOUT, TimeUnit.MILLISECONDS);
190+
191+
fail("Exception wasn't thrown");
192+
}
193+
catch (IgniteCheckedException expected) {
194+
assertTrue(X.hasCause(expected, "Failed to INSERT some keys because they are already in cache",
195+
IgniteSQLException.class));
196+
}
197+
}
198+
199+
/** */
200+
private IgniteInternalFuture<?> lockKeyAndWaitForLatch(int key, CountDownLatch unlockLatch) throws Exception {
201+
CountDownLatch lockLatch = new CountDownLatch(1);
202+
203+
IgniteInternalFuture<?> lockKeyFut = GridTestUtils.runAsync(() -> {
204+
try (Transaction tx = client.transactions().txStart(PESSIMISTIC, READ_COMMITTED)) {
205+
client.cache(TABLE_NAME).put(key, new Employer("Test", 0d));
206+
207+
lockLatch.countDown();
208+
209+
assertTrue(unlockLatch.await(TIMEOUT, TimeUnit.MILLISECONDS));
210+
211+
tx.commit();
212+
}
213+
});
214+
215+
assertTrue(lockLatch.await(TIMEOUT, TimeUnit.MILLISECONDS));
216+
217+
return lockKeyFut;
218+
}
219+
}

0 commit comments

Comments
 (0)