Skip to content

Commit d616dca

Browse files
authored
IGNITE-28550 Get rid of external error serialization in ServiceSingleNodeDeploymentResult (#13041)
1 parent c11e51d commit d616dca

File tree

5 files changed

+92
-85
lines changed

5 files changed

+92
-85
lines changed

modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -390,7 +390,7 @@ public CoreMessagesProvider(Marshaller schemaAwareMarhaller, Marshaller schemaLe
390390
// [6300 - 6400]: Services messages. Most of them originally come from Discovery.
391391
msgIdx = 6300;
392392
withNoSchema(ServiceDeploymentProcessId.class);
393-
withNoSchema(ServiceSingleNodeDeploymentResult.class);
393+
withSchema(ServiceSingleNodeDeploymentResult.class);
394394
withNoSchema(ServiceClusterDeploymentResult.class);
395395
withNoSchema(ServiceDeploymentRequest.class);
396396
withNoSchema(ServiceUndeploymentRequest.class);

modules/core/src/main/java/org/apache/ignite/internal/processors/service/IgniteServiceProcessor.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1944,14 +1944,14 @@ private void processDynamicCacheChangeRequest(DynamicCacheChangeBatch msg) {
19441944
*/
19451945
private void processServicesFullDeployments(ServiceClusterDeploymentResultBatch msg) {
19461946
final Map<IgniteUuid, Map<UUID, Integer>> fullTops = new HashMap<>();
1947-
final Map<IgniteUuid, Collection<byte[]>> fullErrors = new HashMap<>();
1947+
final Map<IgniteUuid, Collection<Throwable>> fullErrors = new HashMap<>();
19481948

19491949
for (ServiceClusterDeploymentResult depRes : msg.results()) {
19501950
final IgniteUuid srvcId = depRes.serviceId();
19511951
final Map<UUID, ServiceSingleNodeDeploymentResult> deps = depRes.results();
19521952

19531953
final Map<UUID, Integer> top = new HashMap<>();
1954-
final Collection<byte[]> errors = new ArrayList<>();
1954+
final Collection<Throwable> errors = new ArrayList<>();
19551955

19561956
deps.forEach((nodeId, res) -> {
19571957
int cnt = res.count();

modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceDeploymentActions.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.util.UUID;
2525
import org.apache.ignite.internal.GridKernalContext;
2626
import org.apache.ignite.internal.processors.platform.utils.PlatformUtils;
27+
import org.apache.ignite.internal.util.typedef.F;
2728
import org.apache.ignite.lang.IgniteUuid;
2829
import org.apache.ignite.services.ServiceConfiguration;
2930
import org.jetbrains.annotations.NotNull;
@@ -45,7 +46,7 @@ public class ServiceDeploymentActions {
4546
private Map<IgniteUuid, Map<UUID, Integer>> depTops;
4647

4748
/** Services deployment errors. */
48-
private Map<IgniteUuid, Collection<byte[]>> depErrors;
49+
private Map<IgniteUuid, Collection<Throwable>> depErrors;
4950

5051
/** Current platform */
5152
private final String platform;
@@ -131,14 +132,14 @@ public void deploymentTopologies(@NotNull Map<IgniteUuid, Map<UUID, Integer>> de
131132
/**
132133
* @return Deployment errors.
133134
*/
134-
@NotNull public Map<IgniteUuid, Collection<byte[]>> deploymentErrors() {
135-
return depErrors != null ? depErrors : Collections.emptyMap();
135+
@NotNull public Map<IgniteUuid, Collection<Throwable>> deploymentErrors() {
136+
return F.emptyIfNull(depErrors);
136137
}
137138

138139
/**
139140
* @param depErrors Deployment errors.
140141
*/
141-
public void deploymentErrors(@NotNull Map<IgniteUuid, Collection<byte[]>> depErrors) {
142+
public void deploymentErrors(@NotNull Map<IgniteUuid, Collection<Throwable>> depErrors) {
142143
this.depErrors = Collections.unmodifiableMap(new HashMap<>(depErrors));
143144
}
144145
}

modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceDeploymentTask.java

Lines changed: 12 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@
4242
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
4343
import org.apache.ignite.internal.util.typedef.F;
4444
import org.apache.ignite.internal.util.typedef.internal.S;
45-
import org.apache.ignite.internal.util.typedef.internal.U;
4645
import org.apache.ignite.lang.IgniteUuid;
4746
import org.apache.ignite.services.ServiceConfiguration;
4847
import org.jetbrains.annotations.NotNull;
@@ -385,9 +384,9 @@ private void createAndSendSingleDeploymentsMessage(ServiceDeploymentProcessId de
385384

386385
for (IgniteUuid srvcId : depServicesIds) {
387386
ServiceSingleNodeDeploymentResult depRes = new ServiceSingleNodeDeploymentResult(
388-
srvcProc.localInstancesCount(srvcId));
387+
srvcProc.localInstancesCount(srvcId), log);
389388

390-
attachDeploymentErrors(depRes, errors.get(srvcId));
389+
depRes.errors(errors.get(srvcId));
391390

392391
results.put(srvcId, depRes);
393392
}
@@ -397,9 +396,9 @@ private void createAndSendSingleDeploymentsMessage(ServiceDeploymentProcessId de
397396
return;
398397

399398
ServiceSingleNodeDeploymentResult depRes = new ServiceSingleNodeDeploymentResult(
400-
srvcProc.localInstancesCount(srvcId));
399+
srvcProc.localInstancesCount(srvcId), log);
401400

402-
attachDeploymentErrors(depRes, err);
401+
depRes.errors(err);
403402

404403
results.put(srvcId, depRes);
405404
});
@@ -464,7 +463,7 @@ protected void onReceiveFullDeploymentsMessage(ServiceClusterDeploymentResultBat
464463
assert depResults != null : "Services deployment actions should be attached.";
465464

466465
final Map<IgniteUuid, Map<UUID, Integer>> fullTops = depResults.deploymentTopologies();
467-
final Map<IgniteUuid, Collection<byte[]>> fullErrors = depResults.deploymentErrors();
466+
final Map<IgniteUuid, Collection<Throwable>> fullErrors = depResults.deploymentErrors();
468467

469468
depActions.deploymentTopologies(fullTops);
470469
depActions.deploymentErrors(fullErrors);
@@ -520,7 +519,7 @@ private void completeInitiatingFuture(final Throwable err) {
520519
return;
521520
}
522521

523-
Collection<byte[]> errors = depActions.deploymentErrors().get(srvcId);
522+
Collection<Throwable> errors = depActions.deploymentErrors().get(srvcId);
524523

525524
if (errors == null) {
526525
srvcProc.completeInitiatingFuture(true, srvcId, null);
@@ -530,27 +529,11 @@ private void completeInitiatingFuture(final Throwable err) {
530529

531530
Throwable depErr = null;
532531

533-
for (byte[] error : errors) {
534-
try {
535-
Throwable t = U.unmarshal(ctx, error, null);
536-
537-
if (depErr == null)
538-
depErr = t;
539-
else
540-
depErr.addSuppressed(t);
541-
}
542-
catch (IgniteCheckedException e) {
543-
log.error("Failed to unmarshal deployment error.", e);
544-
545-
Exception ex = new IgniteCheckedException(
546-
"Failed to unmarshal deployment error, see server logs for details."
547-
);
548-
549-
if (depErr == null)
550-
depErr = ex;
551-
else
552-
depErr.addSuppressed(ex);
553-
}
532+
for (Throwable error : errors) {
533+
if (depErr == null)
534+
depErr = error;
535+
else
536+
depErr.addSuppressed(error);
554537
}
555538

556539
srvcProc.completeInitiatingFuture(true, srvcId, depErr);
@@ -656,7 +639,7 @@ private Collection<ServiceClusterDeploymentResult> buildFullDeploymentsResults(
656639
if (cnt == 0 && res.errors().isEmpty())
657640
return;
658641

659-
ServiceSingleNodeDeploymentResult singleDepRes = new ServiceSingleNodeDeploymentResult(cnt);
642+
ServiceSingleNodeDeploymentResult singleDepRes = new ServiceSingleNodeDeploymentResult(cnt, log);
660643

661644
if (!res.errors().isEmpty())
662645
singleDepRes.errors(res.errors());
@@ -675,44 +658,6 @@ private Collection<ServiceClusterDeploymentResult> buildFullDeploymentsResults(
675658
return fullResults;
676659
}
677660

678-
/**
679-
* @param depRes Service single deployments results.
680-
* @param errors Deployment errors.
681-
*/
682-
private void attachDeploymentErrors(@NotNull ServiceSingleNodeDeploymentResult depRes,
683-
@Nullable Collection<Throwable> errors) {
684-
if (F.isEmpty(errors))
685-
return;
686-
687-
Collection<byte[]> errorsBytes = new ArrayList<>();
688-
689-
for (Throwable th : errors) {
690-
try {
691-
byte[] arr = U.marshal(ctx, th);
692-
693-
errorsBytes.add(arr);
694-
}
695-
catch (IgniteCheckedException e) {
696-
log.error("Failed to marshal deployment error, err=" + th, e);
697-
698-
try {
699-
Exception ex = new IgniteCheckedException(
700-
"Failed to marshal deployment error, see server logs for details, err=" + th
701-
);
702-
703-
byte[] arr = U.marshal(ctx, ex);
704-
705-
errorsBytes.add(arr);
706-
}
707-
catch (IgniteCheckedException ex) {
708-
log.error("Failed to attach deployment error information to deployment result message", ex);
709-
}
710-
}
711-
}
712-
713-
depRes.errors(errorsBytes);
714-
}
715-
716661
/**
717662
* Handles a node leaves topology.
718663
*

modules/core/src/main/java/org/apache/ignite/internal/processors/service/ServiceSingleNodeDeploymentResult.java

Lines changed: 72 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -18,29 +18,41 @@
1818
package org.apache.ignite.internal.processors.service;
1919

2020
import java.io.Serializable;
21+
import java.util.ArrayList;
2122
import java.util.Collection;
22-
import java.util.Collections;
23+
import org.apache.ignite.IgniteCheckedException;
24+
import org.apache.ignite.IgniteLogger;
25+
import org.apache.ignite.internal.MarshallableMessage;
2326
import org.apache.ignite.internal.Order;
27+
import org.apache.ignite.internal.util.typedef.F;
2428
import org.apache.ignite.internal.util.typedef.internal.S;
25-
import org.apache.ignite.plugin.extensions.communication.Message;
29+
import org.apache.ignite.internal.util.typedef.internal.U;
30+
import org.apache.ignite.marshaller.Marshaller;
2631
import org.jetbrains.annotations.NotNull;
32+
import org.jetbrains.annotations.Nullable;
2733

2834
/**
2935
* Service single node deployment result.
3036
* <p/>
3137
* Contains count of deployed service instances on single node and deployment errors if exist.
3238
*/
33-
public class ServiceSingleNodeDeploymentResult implements Message, Serializable {
39+
public class ServiceSingleNodeDeploymentResult implements MarshallableMessage, Serializable {
3440
/** */
3541
private static final long serialVersionUID = 0L;
3642

3743
/** Count of service's instances. */
3844
@Order(0)
3945
int cnt;
4046

41-
/** Serialized exceptions. */
47+
/** Exceptions. */
48+
private @Nullable Collection<Throwable> errors;
49+
50+
/** Serialized {@link #errors}. */
4251
@Order(1)
43-
Collection<byte[]> errors;
52+
@Nullable Collection<byte[]> errorsBytes;
53+
54+
/** Logger. */
55+
private IgniteLogger log;
4456

4557
/**
4658
* Empty constructor for marshalling purposes.
@@ -50,9 +62,11 @@ public ServiceSingleNodeDeploymentResult() {
5062

5163
/**
5264
* @param cnt Count of service's instances.
65+
* @param log Logger.
5366
*/
54-
public ServiceSingleNodeDeploymentResult(int cnt) {
67+
public ServiceSingleNodeDeploymentResult(int cnt, IgniteLogger log) {
5568
this.cnt = cnt;
69+
this.log = log;
5670
}
5771

5872
/**
@@ -63,19 +77,66 @@ public int count() {
6377
}
6478

6579
/**
66-
* @return Serialized exceptions.
80+
* @return Exceptions.
6781
*/
68-
@NotNull public Collection<byte[]> errors() {
69-
return errors != null ? errors : Collections.emptyList();
82+
public @NotNull Collection<Throwable> errors() {
83+
return F.emptyIfNull(errors);
7084
}
7185

7286
/**
73-
* @param errors Serialized exceptions.
87+
* @param errors Exceptions.
7488
*/
75-
public void errors(Collection<byte[]> errors) {
89+
public void errors(@Nullable Collection<Throwable> errors) {
7690
this.errors = errors;
7791
}
7892

93+
/** {@inheritDoc} */
94+
@Override public void prepareMarshal(Marshaller marsh) throws IgniteCheckedException {
95+
if (F.isEmpty(errors))
96+
return;
97+
98+
errorsBytes = new ArrayList<>();
99+
100+
for (Throwable th : errors) {
101+
try {
102+
byte[] arr = U.marshal(marsh, th);
103+
104+
errorsBytes.add(arr);
105+
}
106+
catch (IgniteCheckedException e) {
107+
log.error("Failed to marshal deployment error, err=" + th, e);
108+
109+
try {
110+
byte[] arr = U.marshal(
111+
marsh,
112+
new IgniteCheckedException("Failed to marshal deployment error, see server logs for details, err=" + th)
113+
);
114+
115+
errorsBytes.add(arr);
116+
}
117+
catch (IgniteCheckedException ex) {
118+
log.error("Failed to attach deployment error information to deployment result message", ex);
119+
}
120+
}
121+
}
122+
}
123+
124+
/** {@inheritDoc} */
125+
@Override public void finishUnmarshal(Marshaller marsh, ClassLoader clsLdr) throws IgniteCheckedException {
126+
try {
127+
if (errorsBytes != null && errors == null) {
128+
errors = new ArrayList<>();
129+
130+
for (byte[] arr : errorsBytes)
131+
errors.add(U.unmarshal(marsh, arr, clsLdr));
132+
}
133+
}
134+
catch (IgniteCheckedException e) {
135+
U.error(null, "Failed to unmarshal deployment result message", e);
136+
137+
errors.add(new IgniteCheckedException("Failed to unmarshal deployment error, see server logs for details."));
138+
}
139+
}
79140

80141
/** {@inheritDoc} */
81142
@Override public String toString() {

0 commit comments

Comments
 (0)