Skip to content

Commit 222a85b

Browse files
authored
IGNITE-27776 Simplify custom messages hierarchy (#12708)
1 parent af5f7aa commit 222a85b

56 files changed

Lines changed: 290 additions & 409 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/CustomMessageWrapper.java

Lines changed: 0 additions & 68 deletions
This file was deleted.

modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryCustomMessage.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import java.io.Serializable;
2121
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
2222
import org.apache.ignite.lang.IgniteUuid;
23-
import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
2423
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddFinishedMessage;
2524
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddedMessage;
2625
import org.jetbrains.annotations.Nullable;
@@ -93,8 +92,6 @@ public interface DiscoveryCustomMessage extends Serializable {
9392
public boolean isMutable();
9493

9594
/**
96-
* See {@link DiscoverySpiCustomMessage#stopProcess()}.
97-
*
9895
* @return {@code True} if message should not be sent to others nodes after it was processed on coordinator.
9996
*/
10097
public default boolean stopProcess() {

modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,6 @@
132132
import org.apache.ignite.spi.discovery.DiscoveryMetricsProvider;
133133
import org.apache.ignite.spi.discovery.DiscoveryNotification;
134134
import org.apache.ignite.spi.discovery.DiscoverySpi;
135-
import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
136135
import org.apache.ignite.spi.discovery.DiscoverySpiDataExchange;
137136
import org.apache.ignite.spi.discovery.DiscoverySpiHistorySupport;
138137
import org.apache.ignite.spi.discovery.DiscoverySpiListener;
@@ -593,8 +592,8 @@ private void onDiscovery0(DiscoveryNotification notification) {
593592
ClusterNode node = notification.getNode();
594593
long topVer = notification.getTopVer();
595594

596-
DiscoveryCustomMessage customMsg = notification.getCustomMsgData() == null ? null
597-
: ((CustomMessageWrapper)notification.getCustomMsgData()).delegate();
595+
DiscoveryCustomMessage customMsg = U.unwrapCustomMessage(notification.customMessage() == null ?
596+
null : notification.customMessage());
598597

599598
if (skipMessage(notification.type(), customMsg))
600599
return;
@@ -933,7 +932,7 @@ public SecurityAwareNotificationTask(DiscoveryNotification notification) {
933932

934933
/** */
935934
@Override public void run() {
936-
DiscoverySpiCustomMessage customMsg = notification.getCustomMsgData();
935+
DiscoveryCustomMessage customMsg = notification.customMessage();
937936

938937
if (customMsg instanceof SecurityAwareCustomMessageWrapper) {
939938
UUID secSubjId = ((SecurityAwareCustomMessageWrapper)customMsg).securitySubjectId();
@@ -2336,7 +2335,7 @@ public void sendCustomEvent(DiscoveryCustomMessage msg) throws IgniteCheckedExce
23362335

23372336
getSpi().sendCustomEvent(security.enabled()
23382337
? new SecurityAwareCustomMessageWrapper(msg, security.securityContext().subject().id())
2339-
: new CustomMessageWrapper(msg));
2338+
: msg);
23402339
}
23412340
catch (IgniteClientDisconnectedException e) {
23422341
IgniteFuture<?> reconnectFut = ctx.cluster().clientReconnectFuture();
@@ -2941,7 +2940,7 @@ private static class NotificationEvent {
29412940
Collection<ClusterNode> topSnapshot;
29422941

29432942
/** Data. */
2944-
@Nullable DiscoveryCustomMessage data;
2943+
@Nullable DiscoveryCustomMessage customMsg;
29452944

29462945
/** Span container. */
29472946
SpanContainer spanContainer;
@@ -2955,7 +2954,7 @@ private static class NotificationEvent {
29552954
* @param node Node.
29562955
* @param discoCache Disco cache.
29572956
* @param topSnapshot Topology snapshot.
2958-
* @param data Data.
2957+
* @param customMsg Data.
29592958
* @param spanContainer Span container.
29602959
*/
29612960
public NotificationEvent(
@@ -2964,7 +2963,7 @@ public NotificationEvent(
29642963
ClusterNode node,
29652964
DiscoCache discoCache,
29662965
Collection<ClusterNode> topSnapshot,
2967-
@Nullable DiscoveryCustomMessage data,
2966+
@Nullable DiscoveryCustomMessage customMsg,
29682967
SpanContainer spanContainer,
29692968
SecurityContext secCtx
29702969
) {
@@ -2973,7 +2972,7 @@ public NotificationEvent(
29732972
this.node = node;
29742973
this.discoCache = discoCache;
29752974
this.topSnapshot = topSnapshot;
2976-
this.data = data;
2975+
this.customMsg = customMsg;
29772976
this.spanContainer = spanContainer;
29782977
this.secCtx = secCtx;
29792978
}
@@ -3072,7 +3071,7 @@ else if (type == EVT_CLIENT_NODE_RECONNECTED)
30723071
* @param notificationEvt Notification event.
30733072
*/
30743073
void addEvent(NotificationEvent notificationEvt) {
3075-
assert notificationEvt.node != null : notificationEvt.data;
3074+
assert notificationEvt.node != null : notificationEvt.customMsg;
30763075

30773076
if (notificationEvt.type == EVT_CLIENT_NODE_DISCONNECTED)
30783077
discoWrk.disconnectEvtFut = new GridFutureAdapter();
@@ -3228,11 +3227,11 @@ private void body0() throws InterruptedException {
32283227
customEvt.type(type);
32293228
customEvt.topologySnapshot(topVer.topologyVersion(), evt.topSnapshot);
32303229
customEvt.affinityTopologyVersion(topVer);
3231-
customEvt.customMessage(evt.data);
3230+
customEvt.customMessage(evt.customMsg);
32323231
customEvt.span(evt.spanContainer != null ? evt.spanContainer.span() : null);
32333232

32343233
if (evt.discoCache == null) {
3235-
assert discoCache != null : evt.data;
3234+
assert discoCache != null : evt.customMsg;
32363235

32373236
evt.discoCache = discoCache;
32383237
}

modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/SecurityAwareCustomMessageWrapper.java

Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,18 +21,20 @@
2121
import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
2222
import org.jetbrains.annotations.Nullable;
2323

24-
/** Extends {@link CustomMessageWrapper} with ID of security subject that initiated the current message. */
25-
public class SecurityAwareCustomMessageWrapper extends CustomMessageWrapper {
24+
/** Custom message wrapper with ID of security subject that initiated the current message. */
25+
public class SecurityAwareCustomMessageWrapper extends DiscoverySpiCustomMessage {
2626
/** */
2727
private static final long serialVersionUID = 0L;
2828

2929
/** Security subject ID. */
3030
private final UUID secSubjId;
3131

32+
/** Original message. */
33+
private final DiscoveryCustomMessage delegate;
34+
3235
/** */
3336
public SecurityAwareCustomMessageWrapper(DiscoveryCustomMessage delegate, UUID secSubjId) {
34-
super(delegate);
35-
37+
this.delegate = delegate;
3638
this.secSubjId = secSubjId;
3739
}
3840

@@ -42,8 +44,25 @@ public UUID securitySubjectId() {
4244
}
4345

4446
/** {@inheritDoc} */
45-
@Override public @Nullable DiscoverySpiCustomMessage ackMessage() {
46-
DiscoveryCustomMessage ack = delegate().ackMessage();
47+
@Override public boolean isMutable() {
48+
return delegate.isMutable();
49+
}
50+
51+
/** {@inheritDoc} */
52+
@Override public boolean stopProcess() {
53+
return delegate.stopProcess();
54+
}
55+
56+
/**
57+
* @return Delegate.
58+
*/
59+
public DiscoveryCustomMessage delegate() {
60+
return delegate;
61+
}
62+
63+
/** {@inheritDoc} */
64+
@Override public @Nullable DiscoveryCustomMessage ackMessage() {
65+
DiscoveryCustomMessage ack = delegate.ackMessage();
4766

4867
return ack == null ? null : new SecurityAwareCustomMessageWrapper(ack, secSubjId);
4968
}

modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneNoopDiscoverySpi.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,13 @@
2323
import java.util.UUID;
2424
import org.apache.ignite.IgniteException;
2525
import org.apache.ignite.cluster.ClusterNode;
26+
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
2627
import org.apache.ignite.lang.IgniteProductVersion;
2728
import org.apache.ignite.spi.IgniteSpiAdapter;
2829
import org.apache.ignite.spi.IgniteSpiException;
2930
import org.apache.ignite.spi.IgniteSpiNoop;
3031
import org.apache.ignite.spi.discovery.DiscoveryMetricsProvider;
3132
import org.apache.ignite.spi.discovery.DiscoverySpi;
32-
import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
3333
import org.apache.ignite.spi.discovery.DiscoverySpiDataExchange;
3434
import org.apache.ignite.spi.discovery.DiscoverySpiListener;
3535
import org.apache.ignite.spi.discovery.DiscoverySpiNodeAuthenticator;
@@ -101,7 +101,7 @@ public class StandaloneNoopDiscoverySpi extends IgniteSpiAdapter implements Disc
101101
}
102102

103103
/** {@inheritDoc} */
104-
@Override public void sendCustomEvent(DiscoverySpiCustomMessage msg) throws IgniteException {
104+
@Override public void sendCustomEvent(DiscoveryCustomMessage msg) throws IgniteException {
105105

106106
}
107107

modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,9 @@
187187
import org.apache.ignite.internal.managers.communication.GridIoPolicy;
188188
import org.apache.ignite.internal.managers.deployment.GridDeployment;
189189
import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo;
190+
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
190191
import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
192+
import org.apache.ignite.internal.managers.discovery.SecurityAwareCustomMessageWrapper;
191193
import org.apache.ignite.internal.mxbean.IgniteStandardMXBean;
192194
import org.apache.ignite.internal.processors.cache.CacheDefaultBinaryAffinityKeyMapper;
193195
import org.apache.ignite.internal.processors.cache.CacheObjectContext;
@@ -8305,4 +8307,14 @@ public static <T extends IgniteDataTransferObject> IgniteDataTransferObjectSeria
83058307
return (IgniteDataTransferObjectSerializer<T>)EMPTY_DTO_SERIALIZER;
83068308
}
83078309
}
8310+
8311+
/**
8312+
* Unwraps messsage if it is wrapped by {@link SecurityAwareCustomMessageWrapper}.
8313+
*
8314+
* @param msg Message.
8315+
*/
8316+
public static DiscoveryCustomMessage unwrapCustomMessage(DiscoveryCustomMessage msg) {
8317+
return msg instanceof SecurityAwareCustomMessageWrapper ?
8318+
((SecurityAwareCustomMessageWrapper)msg).delegate() : msg;
8319+
}
83088320
}

modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryNotification.java

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import java.util.Collection;
2020
import java.util.NavigableMap;
2121
import org.apache.ignite.cluster.ClusterNode;
22+
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
2223
import org.apache.ignite.internal.processors.tracing.messages.SpanContainer;
2324
import org.jetbrains.annotations.Nullable;
2425

@@ -41,8 +42,8 @@ public class DiscoveryNotification {
4142
/** Topology history. */
4243
private @Nullable NavigableMap<Long, Collection<ClusterNode>> topHist;
4344

44-
/** Custom message data. */
45-
private @Nullable DiscoverySpiCustomMessage customMsgData;
45+
/** Custom message. */
46+
private @Nullable DiscoveryCustomMessage customMsg;
4647

4748
/** Span container. */
4849
private SpanContainer spanContainer;
@@ -66,7 +67,7 @@ public DiscoveryNotification(int eventType, long topVer, ClusterNode node, Colle
6667
* @param node Node.
6768
* @param topSnapshot Topology snapshot.
6869
* @param topHist Topology history.
69-
* @param customMsgData Custom message data.
70+
* @param customMsg Custom message.
7071
* @param spanContainer Span container.
7172
*/
7273
public DiscoveryNotification(
@@ -75,15 +76,15 @@ public DiscoveryNotification(
7576
ClusterNode node,
7677
Collection<ClusterNode> topSnapshot,
7778
@Nullable NavigableMap<Long, Collection<ClusterNode>> topHist,
78-
@Nullable DiscoverySpiCustomMessage customMsgData,
79+
@Nullable DiscoveryCustomMessage customMsg,
7980
SpanContainer spanContainer
8081
) {
8182
this.eventType = eventType;
8283
this.topVer = topVer;
8384
this.node = node;
8485
this.topSnapshot = topSnapshot;
8586
this.topHist = topHist;
86-
this.customMsgData = customMsgData;
87+
this.customMsg = customMsg;
8788
this.spanContainer = spanContainer;
8889
}
8990

@@ -123,10 +124,10 @@ public NavigableMap<Long, Collection<ClusterNode>> getTopHist() {
123124
}
124125

125126
/**
126-
* @return Custom message data.
127+
* @return Custom message.
127128
*/
128-
public DiscoverySpiCustomMessage getCustomMsgData() {
129-
return customMsgData;
129+
public DiscoveryCustomMessage customMessage() {
130+
return customMsg;
130131
}
131132

132133
/**

modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.util.UUID;
2424
import org.apache.ignite.IgniteException;
2525
import org.apache.ignite.cluster.ClusterNode;
26+
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
2627
import org.apache.ignite.lang.IgniteProductVersion;
2728
import org.apache.ignite.spi.IgniteSpi;
2829
import org.apache.ignite.spi.IgniteSpiException;
@@ -153,7 +154,7 @@ public interface DiscoverySpi extends IgniteSpi {
153154
* @param msg Custom message.
154155
* @throws IgniteException if failed to sent the event message.
155156
*/
156-
public void sendCustomEvent(DiscoverySpiCustomMessage msg) throws IgniteException;
157+
public void sendCustomEvent(DiscoveryCustomMessage msg) throws IgniteException;
157158

158159
/**
159160
* Initiates failure of provided node.

0 commit comments

Comments
 (0)