Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import org.apache.ignite.internal.processors.platform.client.IgniteClientException;
import org.apache.ignite.internal.util.typedef.F;

import static org.apache.ignite.internal.processors.service.GridServiceProxy.DEFAULT_SERVICE_TOPOLOGY_AWAIT_TIMEOUT;

/**
* Request topology of certain service.
*/
Expand All @@ -52,7 +54,7 @@ public ClientServiceTopologyRequest(BinaryRawReader reader) {
Map<UUID, Integer> srvcTop;

try {
srvcTop = ctx.kernalContext().service().serviceTopology(name, 0);
srvcTop = ctx.kernalContext().service().serviceTopology(name, DEFAULT_SERVICE_TOPOLOGY_AWAIT_TIMEOUT);
}
catch (IgniteCheckedException e) {
throw new IgniteClientException(ClientStatus.FAILED, "Failed to get topology for service '" + name + "'.", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@
* Wrapper for making {@link org.apache.ignite.services.Service} class proxies.
*/
public class GridServiceProxy<T> implements Serializable {
/** */
public static final long DEFAULT_SERVICE_TOPOLOGY_AWAIT_TIMEOUT = 10_000L;

/** */
private static final long serialVersionUID = 0L;

Expand Down Expand Up @@ -111,8 +114,11 @@ public class GridServiceProxy<T> implements Serializable {
/** Whether multi-node request should be done. */
private final boolean sticky;

/** Service availability wait timeout. */
private final long waitTimeout;
/** Service invocation timeout. A timeout of zero is interpreted as an infinite timeout. */
private final long invokeTimeout;

/** Service topology await timeout. */
private final long topWaitTimeout;

/** */
private final boolean keepBinary;
Expand Down Expand Up @@ -144,7 +150,8 @@ public GridServiceProxy(ClusterGroup prj,
this.sticky = sticky;
this.keepBinary = keepBinary;

waitTimeout = timeout;
invokeTimeout = timeout;
topWaitTimeout = timeout == 0 ? DEFAULT_SERVICE_TOPOLOGY_AWAIT_TIMEOUT : timeout;
hasLocNode = hasLocalNode(prj);

log = ctx.log(getClass());
Expand Down Expand Up @@ -224,7 +231,7 @@ else if (U.isToStringMethod(mtd))
options(Collections.singleton(node))
.withPool(SERVICE_POOL)
.withFailoverDisabled()
.withTimeout(waitTimeout)
.withTimeout(invokeTimeout)
).get());
}
}
Expand Down Expand Up @@ -275,8 +282,8 @@ else if (U.isToStringMethod(mtd))
throw new IgniteException(e);
}

if (waitTimeout > 0 && U.currentTimeMillis() - startTime >= waitTimeout)
throw new IgniteException("Service acquire timeout was reached, stopping. [timeout=" + waitTimeout + "]");
if (invokeTimeout > 0 && U.currentTimeMillis() - startTime >= invokeTimeout)
throw new IgniteException("Service invocation timeout was reached, stopping [timeout=" + invokeTimeout + "]");
}
}
finally {
Expand Down Expand Up @@ -388,7 +395,7 @@ private ClusterNode randomNodeForService(String name) throws IgniteCheckedExcept
if (hasLocNode && ctx.service().service(name) != null)
return ctx.discovery().localNode();

Map<UUID, Integer> snapshot = ctx.service().serviceTopology(name, waitTimeout);
Map<UUID, Integer> snapshot = ctx.service().serviceTopology(name, topWaitTimeout);

if (snapshot == null || snapshot.isEmpty())
return null;
Expand All @@ -397,7 +404,9 @@ private ClusterNode randomNodeForService(String name) throws IgniteCheckedExcept
if (snapshot.size() == 1) {
UUID nodeId = snapshot.keySet().iterator().next();

return prj.node(nodeId);
ClusterNode node = clusterNode(nodeId);

return prj.predicate().apply(node) ? node : null;
}

Collection<ClusterNode> nodes = prj.nodes();
Expand All @@ -419,7 +428,7 @@ private ClusterNode randomNodeForService(String name) throws IgniteCheckedExcept
for (Map.Entry<UUID, Integer> e : snapshot.entrySet()) {
if (i++ >= idx) {
if (e.getValue() > 0)
return ctx.discovery().node(e.getKey());
return clusterNode(e.getKey());
}
}

Expand All @@ -428,7 +437,7 @@ private ClusterNode randomNodeForService(String name) throws IgniteCheckedExcept
// Circle back.
for (Map.Entry<UUID, Integer> e : snapshot.entrySet()) {
if (e.getValue() > 0)
return ctx.discovery().node(e.getKey());
return clusterNode(e.getKey());

if (i++ == idx)
return null;
Expand Down Expand Up @@ -462,6 +471,16 @@ T proxy() {
return proxy;
}

/** */
public ClusterNode clusterNode(UUID nodeId) throws ClusterTopologyCheckedException {
ClusterNode node = ctx.discovery().node(nodeId);

if (node == null)
throw new ClusterTopologyCheckedException("The node holding the service left the cluster [nodeId=" + nodeId + ']');

return node;
}

/**
* @param mtd Method to invoke.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -973,7 +973,7 @@ public IgniteInternalFuture<?> cancelAll(@NotNull Collection<String> servicesNam
if (timeout == 0 && desc == null)
return null;

if (desc != null && desc.topologyInitialized())
if (desc != null && !desc.serviceTopology().isTransitional())
return desc.topologySnapshot();

long wait = 0;
Expand Down Expand Up @@ -1573,7 +1573,7 @@ void completeInitiatingFuture(boolean deploy, IgniteUuid reqSrvcId, Throwable er
*
* @param fullTops Deployment topologies.
*/
void updateServicesTopologies(@NotNull final Map<IgniteUuid, Map<UUID, Integer>> fullTops) {
void updateServicesTopologies(@NotNull final Map<IgniteUuid, ServiceTopology> fullTops) {
if (!enterBusy())
return;

Expand Down Expand Up @@ -1943,7 +1943,7 @@ private void processDynamicCacheChangeRequest(DynamicCacheChangeBatch msg) {
* @param msg Message.
*/
private void processServicesFullDeployments(ServiceClusterDeploymentResultBatch msg) {
final Map<IgniteUuid, Map<UUID, Integer>> fullTops = new HashMap<>();
final Map<IgniteUuid, ServiceTopology> fullTops = new HashMap<>();
final Map<IgniteUuid, Collection<Throwable>> fullErrors = new HashMap<>();

for (ServiceClusterDeploymentResult depRes : msg.results()) {
Expand All @@ -1966,7 +1966,7 @@ private void processServicesFullDeployments(ServiceClusterDeploymentResultBatch
if (!errors.isEmpty())
fullErrors.computeIfAbsent(srvcId, e -> new ArrayList<>()).addAll(errors);

fullTops.put(srvcId, top);
fullTops.put(srvcId, new ServiceTopology(top, depRes.isServiceTopologyTransitional()));
}

synchronized (servicesTopsUpdateMux) {
Expand Down Expand Up @@ -1997,14 +1997,12 @@ private void processServicesFullDeployments(ServiceClusterDeploymentResultBatch
* @param services Services info to update.
* @param tops Deployment topologies.
*/
private void updateServicesMap(Map<IgniteUuid, ServiceInfo> services,
Map<IgniteUuid, Map<UUID, Integer>> tops) {

private void updateServicesMap(Map<IgniteUuid, ServiceInfo> services, Map<IgniteUuid, ServiceTopology> tops) {
tops.forEach((srvcId, top) -> {
ServiceInfo desc = services.get(srvcId);

if (desc != null)
desc.topologySnapshot(top);
desc.updateServiceTopology(top);
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,14 @@ public class ServiceClusterDeploymentResult implements Message {
@GridToStringInclude
Map<UUID, ServiceSingleNodeDeploymentResult> results;

/**
* Whether topology is transitional. Nodes may leave the cluster while the service topology is being recalculated.
* In this case, the resulting service topology may be incomplete. We consider the mentioned service topology
* transitional and expect it to be recalculated soon.
*/
@Order(2)
boolean isSvcTopTransitional;

/** Default constructor for {@link MessageFactory}. */
public ServiceClusterDeploymentResult() {
}
Expand All @@ -51,8 +59,10 @@ public ServiceClusterDeploymentResult() {
* @param srvcId Service id.
* @param results Deployments results.
*/
public ServiceClusterDeploymentResult(@NotNull IgniteUuid srvcId,
@NotNull Map<UUID, ServiceSingleNodeDeploymentResult> results) {
public ServiceClusterDeploymentResult(
@NotNull IgniteUuid srvcId,
@NotNull Map<UUID, ServiceSingleNodeDeploymentResult> results
) {
this.srvcId = srvcId;
this.results = results;
}
Expand All @@ -71,6 +81,20 @@ public Map<UUID, ServiceSingleNodeDeploymentResult> results() {
return Collections.unmodifiableMap(results);
}

/** */
public boolean isServiceTopologyTransitional() {
return isSvcTopTransitional;
}

/**
* Marks topology as transitional. Nodes may leave the cluster while the service topology is being recalculated.
* In this case, the resulting service topology may be incomplete. We consider the mentioned service topology
* transitional and expect it to be recalculated soon.
*/
public void markServiceTopologyTransitional() {
isSvcTopTransitional = true;
}

/** {@inheritDoc} */
@Override public String toString() {
return S.toString(ServiceClusterDeploymentResult.class, this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.processors.platform.utils.PlatformUtils;
import org.apache.ignite.internal.util.typedef.F;
Expand All @@ -43,7 +42,7 @@ public class ServiceDeploymentActions {
private Map<IgniteUuid, ServiceInfo> servicesToUndeploy;

/** Services deployment topologies. */
private Map<IgniteUuid, Map<UUID, Integer>> depTops;
private Map<IgniteUuid, ServiceTopology> depTops;

/** Services deployment errors. */
private Map<IgniteUuid, Collection<Throwable>> depErrors;
Expand Down Expand Up @@ -118,15 +117,15 @@ public boolean deactivate() {
/**
* @return Deployment topologies.
*/
@NotNull public Map<IgniteUuid, Map<UUID, Integer>> deploymentTopologies() {
@NotNull public Map<IgniteUuid, ServiceTopology> deploymentTopologies() {
return depTops != null ? depTops : Collections.emptyMap();
}

/**
* @param depTops Deployment topologies.
*/
public void deploymentTopologies(@NotNull Map<IgniteUuid, Map<UUID, Integer>> depTops) {
this.depTops = Collections.unmodifiableMap(new HashMap<>(depTops));
public void deploymentTopologies(@NotNull Map<IgniteUuid, ServiceTopology> depTops) {
this.depTops = Collections.unmodifiableMap(depTops);
}

/**
Expand All @@ -140,6 +139,6 @@ public void deploymentTopologies(@NotNull Map<IgniteUuid, Map<UUID, Integer>> de
* @param depErrors Deployment errors.
*/
public void deploymentErrors(@NotNull Map<IgniteUuid, Collection<Throwable>> depErrors) {
this.depErrors = Collections.unmodifiableMap(new HashMap<>(depErrors));
this.depErrors = Collections.unmodifiableMap(depErrors);
}
}
Loading
Loading