Skip to content

Commit 9a4fe53

Browse files
committed
IGNITE-28477 Merge StartRoutineDiscoveryMessageV2 into StartRoutineDiscoveryMessage
1 parent 66e3417 commit 9a4fe53

6 files changed

Lines changed: 35 additions & 94 deletions

File tree

modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,6 @@
182182
import org.apache.ignite.internal.processors.continuous.StartRequestData;
183183
import org.apache.ignite.internal.processors.continuous.StartRoutineAckDiscoveryMessage;
184184
import org.apache.ignite.internal.processors.continuous.StartRoutineDiscoveryMessage;
185-
import org.apache.ignite.internal.processors.continuous.StartRoutineDiscoveryMessageV2;
186185
import org.apache.ignite.internal.processors.continuous.StopRoutineAckDiscoveryMessage;
187186
import org.apache.ignite.internal.processors.continuous.StopRoutineDiscoveryMessage;
188187
import org.apache.ignite.internal.processors.datastreamer.DataStreamerEntry;
@@ -508,7 +507,6 @@ public CoreMessagesProvider(Marshaller dfltMarsh, Marshaller schemaAwareMarsh, C
508507
withNoSchema(StartRequestData.class);
509508
withNoSchema(StartRoutineDiscoveryMessage.class);
510509
withNoSchema(StartRoutineAckDiscoveryMessage.class);
511-
withNoSchema(StartRoutineDiscoveryMessageV2.class);
512510

513511
// [10600-10800]: Affinity & partition maps.
514512
msgIdx = 10600;

modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java

Lines changed: 8 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@
7272
import org.apache.ignite.internal.processors.cache.GridCacheProcessor;
7373
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionPartialCountersMap;
7474
import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryHandler;
75+
import org.apache.ignite.internal.processors.continuous.StartRoutineDiscoveryMessage.Mode;
7576
import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
7677
import org.apache.ignite.internal.systemview.ContinuousQueryViewWalker;
7778
import org.apache.ignite.internal.thread.OomExceptionHandler;
@@ -211,26 +212,13 @@ public GridContinuousProcessor(GridKernalContext ctx) {
211212
@Override public void onCustomEvent(AffinityTopologyVersion topVer,
212213
ClusterNode snd,
213214
StartRoutineDiscoveryMessage msg) {
214-
assert !immutableDiscoCustomMsg;
215-
216215
if (ctx.isStopping())
217216
return;
218217

219-
processStartRequestMutable(snd, msg);
220-
}
221-
});
222-
223-
ctx.discovery().setCustomEventListener(StartRoutineDiscoveryMessageV2.class,
224-
new CustomEventListener<StartRoutineDiscoveryMessageV2>() {
225-
@Override public void onCustomEvent(AffinityTopologyVersion topVer,
226-
ClusterNode snd,
227-
StartRoutineDiscoveryMessageV2 msg) {
228-
assert immutableDiscoCustomMsg;
229-
230-
if (ctx.isStopping())
231-
return;
232-
233-
processStartRequestImmutable(topVer, snd, msg);
218+
if (immutableDiscoCustomMsg)
219+
processStartRequestImmutable(topVer, snd, msg);
220+
else
221+
processStartRequestMutable(snd, msg);
234222
}
235223
});
236224

@@ -992,17 +980,15 @@ private AbstractContinuousMessage createStartMessage(UUID routineId,
992980
reqData.prepareMarshal(ctx);
993981

994982
if (!immutableDiscoCustomMsg) {
995-
StartRoutineDiscoveryMessage msg = new StartRoutineDiscoveryMessage(
996-
routineId,
997-
reqData);
983+
StartRoutineDiscoveryMessage msg = new StartRoutineDiscoveryMessage(routineId, reqData, Mode.MUTABLE);
998984

999985
if (hnd.updateCounters() != null)
1000986
msg.addUpdateCounters(ctx.localNodeId(), hnd.updateCounters());
1001987

1002988
return msg;
1003989
}
1004990
else
1005-
return new StartRoutineDiscoveryMessageV2(routineId, reqData);
991+
return new StartRoutineDiscoveryMessage(routineId, reqData, Mode.IMMUTABLE);
1006992
}
1007993

1008994
/**
@@ -1468,7 +1454,7 @@ private void processRoutineStartResultMessage(UUID sndId, ContinuousRoutineStart
14681454
*/
14691455
private void processStartRequestImmutable(final AffinityTopologyVersion topVer,
14701456
final ClusterNode snd,
1471-
final StartRoutineDiscoveryMessageV2 msg) {
1457+
final StartRoutineDiscoveryMessage msg) {
14721458
StartRequestData reqData = msg.startRequestData();
14731459

14741460
ContinuousRoutineInfo routineInfo = new ContinuousRoutineInfo(snd.id(),

modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,13 +30,22 @@
3030
* Discovery message used for Continuous Query registration.
3131
*/
3232
public class StartRoutineDiscoveryMessage extends AbstractContinuousMessage {
33+
/** Discovery message mode. */
34+
enum Mode {
35+
/** Message is mutated by discovery listeners and returns an ack message. */
36+
MUTABLE,
37+
38+
/** Message is not mutated by discovery listeners and does not return an ack message. */
39+
IMMUTABLE
40+
}
41+
3342
/** */
3443
@Order(0)
3544
StartRequestData startReqData;
3645

37-
/** */
46+
/** Errors collected by mutable discovery messages. */
3847
@Order(1)
39-
Map<UUID, ErrorMessage> errs = new HashMap<>();
48+
Map<UUID, ErrorMessage> errs;
4049

4150
/** */
4251
@Order(2)
@@ -46,14 +55,23 @@ public class StartRoutineDiscoveryMessage extends AbstractContinuousMessage {
4655
@Order(3)
4756
Map<UUID, Map<Integer, Long>> updateCntrsPerNode;
4857

58+
/** Discovery message mode. */
59+
@Order(4)
60+
Mode mode = Mode.MUTABLE;
61+
4962
/**
5063
* @param routineId Routine id.
5164
* @param startReqData Start request data.
65+
* @param mode Discovery message mode.
5266
*/
53-
public StartRoutineDiscoveryMessage(UUID routineId, StartRequestData startReqData) {
67+
StartRoutineDiscoveryMessage(UUID routineId, StartRequestData startReqData, Mode mode) {
5468
super(routineId);
5569

5670
this.startReqData = startReqData;
71+
this.mode = mode;
72+
73+
if (mode == Mode.MUTABLE)
74+
errs = new HashMap<>();
5775
}
5876

5977
/** */
@@ -110,11 +128,14 @@ public void addUpdateCounters(UUID nodeId, Map<Integer, Long> cntrs) {
110128

111129
/** {@inheritDoc} */
112130
@Override public boolean isMutable() {
113-
return true;
131+
return mode == Mode.MUTABLE;
114132
}
115133

116134
/** {@inheritDoc} */
117135
@Override public DiscoveryCustomMessage ackMessage() {
136+
if (!isMutable())
137+
return null;
138+
118139
return new StartRoutineAckDiscoveryMessage(routineId, errs, updateCntrs, updateCntrsPerNode);
119140
}
120141

modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessageV2.java

Lines changed: 0 additions & 62 deletions
This file was deleted.

modules/core/src/main/resources/META-INF/classnames.properties

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1518,7 +1518,6 @@ org.apache.ignite.internal.processors.continuous.GridContinuousProcessor$LocalRo
15181518
org.apache.ignite.internal.processors.continuous.StartRequestData
15191519
org.apache.ignite.internal.processors.continuous.StartRoutineAckDiscoveryMessage
15201520
org.apache.ignite.internal.processors.continuous.StartRoutineDiscoveryMessage
1521-
org.apache.ignite.internal.processors.continuous.StartRoutineDiscoveryMessageV2
15221521
org.apache.ignite.internal.processors.continuous.StopRoutineAckDiscoveryMessage
15231522
org.apache.ignite.internal.processors.continuous.StopRoutineDiscoveryMessage
15241523
org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor$3

modules/core/src/test/java/org/apache/ignite/messaging/GridMessagingSelfTest.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@
4040
import org.apache.ignite.cluster.ClusterGroup;
4141
import org.apache.ignite.internal.DiscoverySpiTestListener;
4242
import org.apache.ignite.internal.processors.continuous.StartRoutineDiscoveryMessage;
43-
import org.apache.ignite.internal.processors.continuous.StartRoutineDiscoveryMessageV2;
4443
import org.apache.ignite.internal.processors.continuous.StopRoutineDiscoveryMessage;
4544
import org.apache.ignite.internal.util.GridConcurrentHashSet;
4645
import org.apache.ignite.internal.util.typedef.P2;
@@ -1051,7 +1050,7 @@ public void testAsyncOld() throws Exception {
10511050
}
10521051
}, IllegalStateException.class, null);
10531052

1054-
lsnr.blockCustomEvent(StartRoutineDiscoveryMessage.class, StartRoutineDiscoveryMessageV2.class);
1053+
lsnr.blockCustomEvent(StartRoutineDiscoveryMessage.class);
10551054

10561055
final String topic = "topic";
10571056

@@ -1149,7 +1148,7 @@ public void testAsync() throws Exception {
11491148

11501149
discoSpi.setInternalListener(lsnr);
11511150

1152-
lsnr.blockCustomEvent(StartRoutineDiscoveryMessage.class, StartRoutineDiscoveryMessageV2.class);
1151+
lsnr.blockCustomEvent(StartRoutineDiscoveryMessage.class);
11531152

11541153
final String topic = "topic";
11551154

0 commit comments

Comments
 (0)