Skip to content

Commit 72c907e

Browse files
committed
IGNITE-28396 Fixed service became unavailable if node leave the cluster.
1 parent 0a15b6f commit 72c907e

13 files changed

Lines changed: 424 additions & 55 deletions

File tree

modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/service/ClientServiceTopologyRequest.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@
2929
import org.apache.ignite.internal.processors.platform.client.IgniteClientException;
3030
import org.apache.ignite.internal.util.typedef.F;
3131

32+
import static org.apache.ignite.internal.processors.service.GridServiceProxy.DEFAULT_SERVICE_TOPOLOGY_AWAIT_TIMEOUT;
33+
3234
/**
3335
* Request topology of certain service.
3436
*/
@@ -52,7 +54,7 @@ public ClientServiceTopologyRequest(BinaryRawReader reader) {
5254
Map<UUID, Integer> srvcTop;
5355

5456
try {
55-
srvcTop = ctx.kernalContext().service().serviceTopology(name, 0);
57+
srvcTop = ctx.kernalContext().service().serviceTopology(name, DEFAULT_SERVICE_TOPOLOGY_AWAIT_TIMEOUT);
5658
}
5759
catch (IgniteCheckedException e) {
5860
throw new IgniteClientException(ClientStatus.FAILED, "Failed to get topology for service '" + name + "'.", e);

modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProxy.java

Lines changed: 29 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,9 @@
6969
* Wrapper for making {@link org.apache.ignite.services.Service} class proxies.
7070
*/
7171
public class GridServiceProxy<T> implements Serializable {
72+
/** */
73+
public static final long DEFAULT_SERVICE_TOPOLOGY_AWAIT_TIMEOUT = 10_000L;
74+
7275
/** */
7376
private static final long serialVersionUID = 0L;
7477

@@ -111,8 +114,11 @@ public class GridServiceProxy<T> implements Serializable {
111114
/** Whether multi-node request should be done. */
112115
private final boolean sticky;
113116

114-
/** Service availability wait timeout. */
115-
private final long waitTimeout;
117+
/** Service invocation timeout. A timeout of zero is interpreted as an infinite timeout. */
118+
private final long invokeTimeout;
119+
120+
/** Service topology await timeout. */
121+
private final long topWaitTimeout;
116122

117123
/** */
118124
private final boolean keepBinary;
@@ -144,7 +150,8 @@ public GridServiceProxy(ClusterGroup prj,
144150
this.sticky = sticky;
145151
this.keepBinary = keepBinary;
146152

147-
waitTimeout = timeout;
153+
invokeTimeout = timeout;
154+
topWaitTimeout = timeout == 0 ? DEFAULT_SERVICE_TOPOLOGY_AWAIT_TIMEOUT : timeout;
148155
hasLocNode = hasLocalNode(prj);
149156

150157
log = ctx.log(getClass());
@@ -224,7 +231,7 @@ else if (U.isToStringMethod(mtd))
224231
options(Collections.singleton(node))
225232
.withPool(SERVICE_POOL)
226233
.withFailoverDisabled()
227-
.withTimeout(waitTimeout)
234+
.withTimeout(invokeTimeout)
228235
).get());
229236
}
230237
}
@@ -275,8 +282,8 @@ else if (U.isToStringMethod(mtd))
275282
throw new IgniteException(e);
276283
}
277284

278-
if (waitTimeout > 0 && U.currentTimeMillis() - startTime >= waitTimeout)
279-
throw new IgniteException("Service acquire timeout was reached, stopping. [timeout=" + waitTimeout + "]");
285+
if (invokeTimeout > 0 && U.currentTimeMillis() - startTime >= invokeTimeout)
286+
throw new IgniteException("Service invocation timeout was reached, stopping [timeout=" + invokeTimeout + "]");
280287
}
281288
}
282289
finally {
@@ -388,7 +395,7 @@ private ClusterNode randomNodeForService(String name) throws IgniteCheckedExcept
388395
if (hasLocNode && ctx.service().service(name) != null)
389396
return ctx.discovery().localNode();
390397

391-
Map<UUID, Integer> snapshot = ctx.service().serviceTopology(name, waitTimeout);
398+
Map<UUID, Integer> snapshot = ctx.service().serviceTopology(name, topWaitTimeout);
392399

393400
if (snapshot == null || snapshot.isEmpty())
394401
return null;
@@ -397,7 +404,9 @@ private ClusterNode randomNodeForService(String name) throws IgniteCheckedExcept
397404
if (snapshot.size() == 1) {
398405
UUID nodeId = snapshot.keySet().iterator().next();
399406

400-
return prj.node(nodeId);
407+
ClusterNode node = clusterNode(nodeId);
408+
409+
return prj.predicate().apply(node) ? node : null;
401410
}
402411

403412
Collection<ClusterNode> nodes = prj.nodes();
@@ -419,7 +428,7 @@ private ClusterNode randomNodeForService(String name) throws IgniteCheckedExcept
419428
for (Map.Entry<UUID, Integer> e : snapshot.entrySet()) {
420429
if (i++ >= idx) {
421430
if (e.getValue() > 0)
422-
return ctx.discovery().node(e.getKey());
431+
return clusterNode(e.getKey());
423432
}
424433
}
425434

@@ -428,7 +437,7 @@ private ClusterNode randomNodeForService(String name) throws IgniteCheckedExcept
428437
// Circle back.
429438
for (Map.Entry<UUID, Integer> e : snapshot.entrySet()) {
430439
if (e.getValue() > 0)
431-
return ctx.discovery().node(e.getKey());
440+
return clusterNode(e.getKey());
432441

433442
if (i++ == idx)
434443
return null;
@@ -462,6 +471,16 @@ T proxy() {
462471
return proxy;
463472
}
464473

474+
/** */
475+
public ClusterNode clusterNode(UUID nodeId) throws ClusterTopologyCheckedException {
476+
ClusterNode node = ctx.discovery().node(nodeId);
477+
478+
if (node == null)
479+
throw new ClusterTopologyCheckedException("The node holding the service left the cluster [nodeId=" + nodeId + ']');
480+
481+
return node;
482+
}
483+
465484
/**
466485
* @param mtd Method to invoke.
467486
*/

modules/core/src/main/java/org/apache/ignite/internal/processors/service/IgniteServiceProcessor.java

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -973,7 +973,7 @@ public IgniteInternalFuture<?> cancelAll(@NotNull Collection<String> servicesNam
973973
if (timeout == 0 && desc == null)
974974
return null;
975975

976-
if (desc != null && desc.topologyInitialized())
976+
if (desc != null && !desc.serviceTopology().isTransitional())
977977
return desc.topologySnapshot();
978978

979979
long wait = 0;
@@ -1573,7 +1573,7 @@ void completeInitiatingFuture(boolean deploy, IgniteUuid reqSrvcId, Throwable er
15731573
*
15741574
* @param fullTops Deployment topologies.
15751575
*/
1576-
void updateServicesTopologies(@NotNull final Map<IgniteUuid, Map<UUID, Integer>> fullTops) {
1576+
void updateServicesTopologies(@NotNull final Map<IgniteUuid, ServiceTopology> fullTops) {
15771577
if (!enterBusy())
15781578
return;
15791579

@@ -1943,7 +1943,7 @@ private void processDynamicCacheChangeRequest(DynamicCacheChangeBatch msg) {
19431943
* @param msg Message.
19441944
*/
19451945
private void processServicesFullDeployments(ServiceClusterDeploymentResultBatch msg) {
1946-
final Map<IgniteUuid, Map<UUID, Integer>> fullTops = new HashMap<>();
1946+
final Map<IgniteUuid, ServiceTopology> fullTops = new HashMap<>();
19471947
final Map<IgniteUuid, Collection<Throwable>> fullErrors = new HashMap<>();
19481948

19491949
for (ServiceClusterDeploymentResult depRes : msg.results()) {
@@ -1966,7 +1966,7 @@ private void processServicesFullDeployments(ServiceClusterDeploymentResultBatch
19661966
if (!errors.isEmpty())
19671967
fullErrors.computeIfAbsent(srvcId, e -> new ArrayList<>()).addAll(errors);
19681968

1969-
fullTops.put(srvcId, top);
1969+
fullTops.put(srvcId, new ServiceTopology(top, depRes.isServiceTopologyTransitional()));
19701970
}
19711971

19721972
synchronized (servicesTopsUpdateMux) {
@@ -1997,14 +1997,12 @@ private void processServicesFullDeployments(ServiceClusterDeploymentResultBatch
19971997
* @param services Services info to update.
19981998
* @param tops Deployment topologies.
19991999
*/
2000-
private void updateServicesMap(Map<IgniteUuid, ServiceInfo> services,
2001-
Map<IgniteUuid, Map<UUID, Integer>> tops) {
2002-
2000+
private void updateServicesMap(Map<IgniteUuid, ServiceInfo> services, Map<IgniteUuid, ServiceTopology> tops) {
20032001
tops.forEach((srvcId, top) -> {
20042002
ServiceInfo desc = services.get(srvcId);
20052003

20062004
if (desc != null)
2007-
desc.topologySnapshot(top);
2005+
desc.updateServiceTopology(top);
20082006
});
20092007
}
20102008

modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceClusterDeploymentResult.java

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,14 @@ public class ServiceClusterDeploymentResult implements Message {
4343
@GridToStringInclude
4444
Map<UUID, ServiceSingleNodeDeploymentResult> results;
4545

46+
/**
47+
* Whether topology is transitional. Nodes may leave the cluster while the service topology is being recalculated.
48+
* In this case, the resulting service topology may be incomplete. We consider the mentioned service topology
49+
* transitional and expect it to be recalculated soon.
50+
*/
51+
@Order(2)
52+
boolean isSvcTopTransitional;
53+
4654
/** Default constructor for {@link MessageFactory}. */
4755
public ServiceClusterDeploymentResult() {
4856
}
@@ -51,8 +59,10 @@ public ServiceClusterDeploymentResult() {
5159
* @param srvcId Service id.
5260
* @param results Deployments results.
5361
*/
54-
public ServiceClusterDeploymentResult(@NotNull IgniteUuid srvcId,
55-
@NotNull Map<UUID, ServiceSingleNodeDeploymentResult> results) {
62+
public ServiceClusterDeploymentResult(
63+
@NotNull IgniteUuid srvcId,
64+
@NotNull Map<UUID, ServiceSingleNodeDeploymentResult> results
65+
) {
5666
this.srvcId = srvcId;
5767
this.results = results;
5868
}
@@ -71,6 +81,20 @@ public Map<UUID, ServiceSingleNodeDeploymentResult> results() {
7181
return Collections.unmodifiableMap(results);
7282
}
7383

84+
/** */
85+
public boolean isServiceTopologyTransitional() {
86+
return isSvcTopTransitional;
87+
}
88+
89+
/**
90+
* Marks topology as transitional. Nodes may leave the cluster while the service topology is being recalculated.
91+
* In this case, the resulting service topology may be incomplete. We consider the mentioned service topology
92+
* transitional and expect it to be recalculated soon.
93+
*/
94+
public void markServiceTopologyTransitional() {
95+
isSvcTopTransitional = true;
96+
}
97+
7498
/** {@inheritDoc} */
7599
@Override public String toString() {
76100
return S.toString(ServiceClusterDeploymentResult.class, this);

modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceDeploymentActions.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import java.util.Collections;
2222
import java.util.HashMap;
2323
import java.util.Map;
24-
import java.util.UUID;
2524
import org.apache.ignite.internal.GridKernalContext;
2625
import org.apache.ignite.internal.processors.platform.utils.PlatformUtils;
2726
import org.apache.ignite.internal.util.typedef.F;
@@ -43,7 +42,7 @@ public class ServiceDeploymentActions {
4342
private Map<IgniteUuid, ServiceInfo> servicesToUndeploy;
4443

4544
/** Services deployment topologies. */
46-
private Map<IgniteUuid, Map<UUID, Integer>> depTops;
45+
private Map<IgniteUuid, ServiceTopology> depTops;
4746

4847
/** Services deployment errors. */
4948
private Map<IgniteUuid, Collection<Throwable>> depErrors;
@@ -118,15 +117,15 @@ public boolean deactivate() {
118117
/**
119118
* @return Deployment topologies.
120119
*/
121-
@NotNull public Map<IgniteUuid, Map<UUID, Integer>> deploymentTopologies() {
120+
@NotNull public Map<IgniteUuid, ServiceTopology> deploymentTopologies() {
122121
return depTops != null ? depTops : Collections.emptyMap();
123122
}
124123

125124
/**
126125
* @param depTops Deployment topologies.
127126
*/
128-
public void deploymentTopologies(@NotNull Map<IgniteUuid, Map<UUID, Integer>> depTops) {
129-
this.depTops = Collections.unmodifiableMap(new HashMap<>(depTops));
127+
public void deploymentTopologies(@NotNull Map<IgniteUuid, ServiceTopology> depTops) {
128+
this.depTops = Collections.unmodifiableMap(depTops);
130129
}
131130

132131
/**
@@ -140,6 +139,6 @@ public void deploymentTopologies(@NotNull Map<IgniteUuid, Map<UUID, Integer>> de
140139
* @param depErrors Deployment errors.
141140
*/
142141
public void deploymentErrors(@NotNull Map<IgniteUuid, Collection<Throwable>> depErrors) {
143-
this.depErrors = Collections.unmodifiableMap(new HashMap<>(depErrors));
142+
this.depErrors = Collections.unmodifiableMap(depErrors);
144143
}
145144
}

0 commit comments

Comments
 (0)