Skip to content

Commit dd1cb74

Browse files
IGNITE-28734 Process control.sh messages in management pool - Fixes #13192.
Signed-off-by: Aleksey Plekhanov <plehanov.alex@gmail.com>
1 parent 79e1e40 commit dd1cb74

9 files changed

Lines changed: 158 additions & 12 deletions

File tree

modules/control-utility/src/test/java/org/apache/ignite/testsuites/IgniteControlUtilityTestSuite.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.apache.ignite.util.GridCommandHandlerIndexingTest;
3535
import org.apache.ignite.util.GridCommandHandlerIndexingWithSSLTest;
3636
import org.apache.ignite.util.GridCommandHandlerLegacyClientTest;
37+
import org.apache.ignite.util.GridCommandHandlerManagementPoolTest;
3738
import org.apache.ignite.util.GridCommandHandlerMetadataTest;
3839
import org.apache.ignite.util.GridCommandHandlerSslTest;
3940
import org.apache.ignite.util.GridCommandHandlerTest;
@@ -80,7 +81,8 @@
8081
BaselineEventsRemoteTest.class,
8182

8283
GridCommandHandlerWalTest.class,
83-
GridCommandHandlerCheckpointTest.class
84+
GridCommandHandlerCheckpointTest.class,
85+
GridCommandHandlerManagementPoolTest.class,
8486
})
8587
public class IgniteControlUtilityTestSuite {
8688
}
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.ignite.util;
19+
20+
import java.util.concurrent.CountDownLatch;
21+
import java.util.concurrent.TimeUnit;
22+
import org.apache.ignite.Ignite;
23+
import org.apache.ignite.cache.query.SqlFieldsQuery;
24+
import org.apache.ignite.cache.query.annotations.QuerySqlFunction;
25+
import org.apache.ignite.client.IgniteClient;
26+
import org.apache.ignite.cluster.ClusterState;
27+
import org.apache.ignite.configuration.CacheConfiguration;
28+
import org.apache.ignite.configuration.ClientConfiguration;
29+
import org.apache.ignite.configuration.ClientConnectorConfiguration;
30+
import org.apache.ignite.internal.IgniteInternalFuture;
31+
import org.apache.ignite.internal.processors.query.QueryUtils;
32+
import org.apache.ignite.testframework.GridTestUtils;
33+
import org.junit.Test;
34+
35+
import static org.apache.ignite.Ignition.startClient;
36+
import static org.apache.ignite.internal.commandline.CommandHandler.EXIT_CODE_OK;
37+
38+
/**
39+
* Tests management pool usage for management tasks.
40+
*/
41+
public class GridCommandHandlerManagementPoolTest extends GridCommandHandlerClusterPerMethodAbstractTest {
42+
/** */
43+
private static final long TIMEOUT = 10_000L;
44+
45+
/** */
46+
@Test
47+
public void testManagementTasksWorksWhenClientPoolBlocked() throws Exception {
48+
Ignite ignite = startGrid(0);
49+
50+
assertEquals(EXIT_CODE_OK, execute("--set-state", ClusterState.ACTIVE.name()));
51+
52+
ignite.createCache(new CacheConfiguration<>(DEFAULT_CACHE_NAME)
53+
.setSqlSchema(QueryUtils.DFLT_SCHEMA)
54+
.setSqlFunctionClasses(TestSqlFunctions.class)
55+
.setIndexedTypes(Integer.class, String.class)
56+
);
57+
58+
TestSqlFunctions.latch = new CountDownLatch(1);
59+
60+
try (IgniteClient client = startClient(new ClientConfiguration().setAddresses("127.0.0.1:10800"))) {
61+
// Block client pool by SQL queries.
62+
IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(
63+
() -> client.query(new SqlFieldsQuery("SELECT wait_latch()")).getAll(),
64+
ClientConnectorConfiguration.DFLT_THREAD_POOL_SIZE, "client-thread");
65+
66+
// Check that management tasks still can be processed.
67+
assertEquals(EXIT_CODE_OK, execute("--state")); // Native command.
68+
assertEquals(EXIT_CODE_OK, execute("--checkpoint")); // Multi-node task command.
69+
70+
TestSqlFunctions.latch.countDown();
71+
72+
fut.get(TIMEOUT, TimeUnit.MILLISECONDS);
73+
}
74+
}
75+
76+
/** */
77+
public static class TestSqlFunctions {
78+
/** */
79+
private static CountDownLatch latch;
80+
81+
/** */
82+
@QuerySqlFunction
83+
public static boolean wait_latch() {
84+
try {
85+
latch.await(TIMEOUT, TimeUnit.MILLISECONDS);
86+
}
87+
catch (InterruptedException ignored) {
88+
return false;
89+
}
90+
91+
return true;
92+
}
93+
}
94+
}

modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerAbstractConnectionContext.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.apache.ignite.plugin.security.SecurityCredentials;
3333
import org.jetbrains.annotations.Nullable;
3434

35+
import static org.apache.ignite.internal.processors.odbc.ClientListenerNioListener.MANAGEMENT_CLIENT_ATTR;
3536
import static org.apache.ignite.plugin.security.SecuritySubjectType.REMOTE_CLIENT;
3637

3738
/**
@@ -56,6 +57,9 @@ public abstract class ClientListenerAbstractConnectionContext implements ClientL
5657
/** User attributes. */
5758
protected Map<String, String> userAttrs;
5859

60+
/** If client is management. */
61+
private volatile Boolean managementClient;
62+
5963
/**
6064
* Describes the client connection:
6165
* - thin cli: "cli:host:port@user_name"
@@ -138,11 +142,13 @@ protected void authenticate(GridNioSession ses, String user, String pwd) throws
138142
}
139143

140144
/** */
141-
protected void initClientDescriptor(String prefix) {
145+
protected void initClientContext(String prefix) {
142146
clientDesc = prefix + ":" + ses.remoteAddress().getHostString() + ":" + ses.remoteAddress().getPort();
143147

144148
if (secCtx != null)
145149
clientDesc += "@" + secCtx.subject().login();
150+
151+
managementClient = Boolean.parseBoolean(attributes().get(MANAGEMENT_CLIENT_ATTR));
146152
}
147153

148154
/**
@@ -196,4 +202,9 @@ public int nextTxId() {
196202
@Override public Map<String, String> attributes() {
197203
return F.isEmpty(userAttrs) ? Collections.emptyMap() : Collections.unmodifiableMap(userAttrs);
198204
}
205+
206+
/** {@inheritDoc} */
207+
@Override public boolean managementClient() {
208+
return managementClient;
209+
}
199210
}

modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerConnectionContext.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,6 @@
2424
import org.apache.ignite.internal.util.nio.GridNioSession;
2525
import org.jetbrains.annotations.Nullable;
2626

27-
import static org.apache.ignite.internal.processors.odbc.ClientListenerNioListener.MANAGEMENT_CLIENT_ATTR;
28-
2927
/**
3028
* SQL listener connection context.
3129
*/
@@ -94,7 +92,5 @@ void initializeFromHandshake(GridNioSession ses, ClientListenerProtocolVersion v
9492
/**
9593
* @return {@code True} if client is management.
9694
*/
97-
default boolean managementClient() {
98-
return Boolean.parseBoolean(attributes().get(MANAGEMENT_CLIENT_ATTR));
99-
}
95+
boolean managementClient();
10096
}

modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerProcessor.java

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,8 @@
5959
import org.apache.ignite.internal.util.nio.ssl.GridNioSslFilter;
6060
import org.apache.ignite.internal.util.typedef.F;
6161
import org.apache.ignite.internal.util.typedef.internal.U;
62+
import org.apache.ignite.internal.util.worker.GridWorker;
63+
import org.apache.ignite.internal.util.worker.GridWorkerPool;
6264
import org.apache.ignite.lang.IgniteBiInClosure;
6365
import org.apache.ignite.metric.MetricRegistry;
6466
import org.apache.ignite.mxbean.ClientProcessorMXBean;
@@ -122,6 +124,9 @@ public class ClientListenerProcessor extends GridProcessorAdapter {
122124
/** Executor service. */
123125
private ExecutorService execSvc;
124126

127+
/** Management pool. */
128+
private GridWorkerPool mgmtPool;
129+
125130
/** Thin client distributed configuration. */
126131
private DistributedThinClientConfiguration distrThinCfg;
127132

@@ -161,6 +166,7 @@ public ClientListenerProcessor(GridKernalContext ctx) {
161166
}
162167

163168
execSvc = ctx.pools().getThinClientExecutorService();
169+
mgmtPool = new GridWorkerPool(ctx.pools().getManagementExecutorService(), log);
164170

165171
Exception lastErr = null;
166172

@@ -435,10 +441,43 @@ private void unregisterMBean() {
435441
else {
436442
connCtx.handler().registerRequest(reqId, cmdType);
437443

438-
super.onMessageReceived(ses, msg);
444+
onMessageReceived(ses, connCtx, msg);
439445
}
440446
}
441447
else
448+
onMessageReceived(ses, connCtx, msg);
449+
}
450+
451+
/** */
452+
private void onMessageReceived(
453+
GridNioSession ses,
454+
@Nullable ClientListenerConnectionContext connCtx,
455+
Object msg
456+
) throws IgniteCheckedException {
457+
if (connCtx == null) {
458+
// Process handshake in NIO thread.
459+
try {
460+
proceedMessageReceived(ses, msg);
461+
}
462+
catch (IgniteCheckedException e) {
463+
handleException(ses, e);
464+
}
465+
}
466+
else if (connCtx.managementClient()) {
467+
// Process management messages in management pool.
468+
mgmtPool.execute(
469+
new GridWorker(ctx.igniteInstanceName(), "management-message-received-notify", log) {
470+
@Override protected void body() {
471+
try {
472+
proceedMessageReceived(ses, msg);
473+
}
474+
catch (IgniteCheckedException e) {
475+
handleException(ses, e);
476+
}
477+
}
478+
});
479+
}
480+
else // Process regular messages in client-listener pool.
442481
super.onMessageReceived(ses, msg);
443482
}
444483
};
@@ -488,6 +527,10 @@ private void unregisterMBean() {
488527

489528
execSvc = null;
490529

530+
mgmtPool.join(cancel);
531+
532+
mgmtPool = null;
533+
491534
if (!U.IGNITE_MBEANS_DISABLED)
492535
unregisterMBean();
493536

modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcConnectionContext.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -267,7 +267,7 @@ public JdbcConnectionContext(GridKernalContext ctx, GridNioSession ses, GridSpin
267267

268268
protoCtx = new JdbcProtocolContext(ver, features, true);
269269

270-
initClientDescriptor("jdbc-thin");
270+
initClientContext("jdbc-thin");
271271

272272
parser = new JdbcMessageParser(ctx, protoCtx);
273273

modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcConnectionContext.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -198,7 +198,7 @@ public OdbcConnectionContext(GridKernalContext ctx, GridNioSession ses, GridSpin
198198
}
199199
};
200200

201-
initClientDescriptor("odbc");
201+
initClientContext("odbc");
202202

203203
handler = new OdbcRequestHandler(ctx, busyLock, snd, maxCursors, distributedJoins, enforceJoinOrder,
204204
replicatedOnly, collocated, skipReducerOnUpdate, qryEngine, ver, this);

modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientConnectionContext.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -226,7 +226,7 @@ public ClientProtocolContext currentProtocolContext() {
226226

227227
authenticate(ses, user, pwd);
228228

229-
initClientDescriptor("cli");
229+
initClientContext("cli");
230230

231231
handler = new ClientRequestHandler(this, currentProtocolContext);
232232
parser = new ClientMessageParser(this, currentProtocolContext);

modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioAsyncNotifyFilter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ public GridNioAsyncNotifyFilter(String igniteInstanceName, Executor exec, Ignite
139139
* @param ses Session.
140140
* @param ex Exception.
141141
*/
142-
private void handleException(GridNioSession ses, IgniteCheckedException ex) {
142+
protected void handleException(GridNioSession ses, IgniteCheckedException ex) {
143143
try {
144144
proceedExceptionCaught(ses, ex);
145145
}

0 commit comments

Comments
 (0)