Skip to content

Commit 1fa94e8

Browse files
authored
IGNITE-28477 Merge StartRoutineDiscoveryMessageV2 into StartRoutineDiscoveryMessage (#13100)
1 parent d21a784 commit 1fa94e8

6 files changed

Lines changed: 34 additions & 94 deletions

File tree

modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -189,7 +189,6 @@
189189
import org.apache.ignite.internal.processors.continuous.StartRequestData;
190190
import org.apache.ignite.internal.processors.continuous.StartRoutineAckDiscoveryMessage;
191191
import org.apache.ignite.internal.processors.continuous.StartRoutineDiscoveryMessage;
192-
import org.apache.ignite.internal.processors.continuous.StartRoutineDiscoveryMessageV2;
193192
import org.apache.ignite.internal.processors.continuous.StopRoutineAckDiscoveryMessage;
194193
import org.apache.ignite.internal.processors.continuous.StopRoutineDiscoveryMessage;
195194
import org.apache.ignite.internal.processors.datastreamer.DataStreamerEntry;
@@ -519,9 +518,8 @@ public CoreMessagesProvider(Marshaller dfltMarsh, Marshaller schemaAwareMarsh, C
519518
withNoSchema(LatchAckMessage.class);
520519
withSchema(AtomicApplicationAttributesAwareRequest.class);
521520
withNoSchema(StartRequestData.class);
522-
withNoSchema(StartRoutineDiscoveryMessage.class);
523521
withNoSchema(StartRoutineAckDiscoveryMessage.class);
524-
withNoSchema(StartRoutineDiscoveryMessageV2.class);
522+
withNoSchema(StartRoutineDiscoveryMessage.class);
525523
withNoSchema(StoredCacheData.class);
526524

527525
// [10600-10800]: Affinity & partition maps.

modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java

Lines changed: 8 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@
7272
import org.apache.ignite.internal.processors.cache.GridCacheProcessor;
7373
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionPartialCountersMap;
7474
import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryHandler;
75+
import org.apache.ignite.internal.processors.continuous.StartRoutineDiscoveryMessage.Mode;
7576
import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
7677
import org.apache.ignite.internal.systemview.ContinuousQueryViewWalker;
7778
import org.apache.ignite.internal.thread.OomExceptionHandler;
@@ -211,26 +212,13 @@ public GridContinuousProcessor(GridKernalContext ctx) {
211212
@Override public void onCustomEvent(AffinityTopologyVersion topVer,
212213
ClusterNode snd,
213214
StartRoutineDiscoveryMessage msg) {
214-
assert !immutableDiscoCustomMsg;
215-
216215
if (ctx.isStopping())
217216
return;
218217

219-
processStartRequestMutable(snd, msg);
220-
}
221-
});
222-
223-
ctx.discovery().setCustomEventListener(StartRoutineDiscoveryMessageV2.class,
224-
new CustomEventListener<StartRoutineDiscoveryMessageV2>() {
225-
@Override public void onCustomEvent(AffinityTopologyVersion topVer,
226-
ClusterNode snd,
227-
StartRoutineDiscoveryMessageV2 msg) {
228-
assert immutableDiscoCustomMsg;
229-
230-
if (ctx.isStopping())
231-
return;
232-
233-
processStartRequestImmutable(topVer, snd, msg);
218+
if (immutableDiscoCustomMsg)
219+
processStartRequestImmutable(topVer, snd, msg);
220+
else
221+
processStartRequestMutable(snd, msg);
234222
}
235223
});
236224

@@ -991,17 +979,15 @@ private AbstractContinuousMessage createStartMessage(UUID routineId,
991979
reqData.prepareMarshal(ctx);
992980

993981
if (!immutableDiscoCustomMsg) {
994-
StartRoutineDiscoveryMessage msg = new StartRoutineDiscoveryMessage(
995-
routineId,
996-
reqData);
982+
StartRoutineDiscoveryMessage msg = new StartRoutineDiscoveryMessage(routineId, reqData, Mode.MUTABLE);
997983

998984
if (hnd.updateCounters() != null)
999985
msg.addUpdateCounters(ctx.localNodeId(), hnd.updateCounters());
1000986

1001987
return msg;
1002988
}
1003989
else
1004-
return new StartRoutineDiscoveryMessageV2(routineId, reqData);
990+
return new StartRoutineDiscoveryMessage(routineId, reqData, Mode.IMMUTABLE);
1005991
}
1006992

1007993
/**
@@ -1467,7 +1453,7 @@ private void processRoutineStartResultMessage(UUID sndId, ContinuousRoutineStart
14671453
*/
14681454
private void processStartRequestImmutable(final AffinityTopologyVersion topVer,
14691455
final ClusterNode snd,
1470-
final StartRoutineDiscoveryMessageV2 msg) {
1456+
final StartRoutineDiscoveryMessage msg) {
14711457
StartRequestData reqData = msg.startRequestData();
14721458

14731459
ContinuousRoutineInfo routineInfo = new ContinuousRoutineInfo(snd.id(),

modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,15 +26,26 @@
2626
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
2727
import org.apache.ignite.internal.util.typedef.internal.S;
2828

29+
import static org.apache.ignite.internal.processors.continuous.StartRoutineDiscoveryMessage.Mode.MUTABLE;
30+
2931
/**
3032
* Discovery message used for Continuous Query registration.
3133
*/
3234
public class StartRoutineDiscoveryMessage extends AbstractContinuousMessage {
35+
/** Discovery message mode. */
36+
enum Mode {
37+
/** Mutable discovery mode. */
38+
MUTABLE,
39+
40+
/** Immutable discovery mode. */
41+
IMMUTABLE
42+
}
43+
3344
/** */
3445
@Order(0)
3546
StartRequestData startReqData;
3647

37-
/** */
48+
/** Errors collected by mutable discovery. */
3849
@Order(1)
3950
Map<UUID, ErrorMessage> errs = new HashMap<>();
4051

@@ -46,14 +57,20 @@ public class StartRoutineDiscoveryMessage extends AbstractContinuousMessage {
4657
@Order(3)
4758
Map<UUID, Map<Integer, Long>> updateCntrsPerNode;
4859

60+
/** Discovery message mode. */
61+
@Order(4)
62+
Mode mode;
63+
4964
/**
5065
* @param routineId Routine id.
5166
* @param startReqData Start request data.
67+
* @param mode Discovery message mode.
5268
*/
53-
public StartRoutineDiscoveryMessage(UUID routineId, StartRequestData startReqData) {
69+
StartRoutineDiscoveryMessage(UUID routineId, StartRequestData startReqData, Mode mode) {
5470
super(routineId);
5571

5672
this.startReqData = startReqData;
73+
this.mode = mode;
5774
}
5875

5976
/** */
@@ -110,11 +127,14 @@ public void addUpdateCounters(UUID nodeId, Map<Integer, Long> cntrs) {
110127

111128
/** {@inheritDoc} */
112129
@Override public boolean isMutable() {
113-
return true;
130+
return mode == MUTABLE;
114131
}
115132

116133
/** {@inheritDoc} */
117134
@Override public DiscoveryCustomMessage ackMessage() {
135+
if (!isMutable())
136+
return null;
137+
118138
return new StartRoutineAckDiscoveryMessage(routineId, errs, updateCntrs, updateCntrsPerNode);
119139
}
120140

modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessageV2.java

Lines changed: 0 additions & 62 deletions
This file was deleted.

modules/core/src/main/resources/META-INF/classnames.properties

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1518,7 +1518,6 @@ org.apache.ignite.internal.processors.continuous.GridContinuousProcessor$LocalRo
15181518
org.apache.ignite.internal.processors.continuous.StartRequestData
15191519
org.apache.ignite.internal.processors.continuous.StartRoutineAckDiscoveryMessage
15201520
org.apache.ignite.internal.processors.continuous.StartRoutineDiscoveryMessage
1521-
org.apache.ignite.internal.processors.continuous.StartRoutineDiscoveryMessageV2
15221521
org.apache.ignite.internal.processors.continuous.StopRoutineAckDiscoveryMessage
15231522
org.apache.ignite.internal.processors.continuous.StopRoutineDiscoveryMessage
15241523
org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor$3

modules/core/src/test/java/org/apache/ignite/messaging/GridMessagingSelfTest.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@
4040
import org.apache.ignite.cluster.ClusterGroup;
4141
import org.apache.ignite.internal.DiscoverySpiTestListener;
4242
import org.apache.ignite.internal.processors.continuous.StartRoutineDiscoveryMessage;
43-
import org.apache.ignite.internal.processors.continuous.StartRoutineDiscoveryMessageV2;
4443
import org.apache.ignite.internal.processors.continuous.StopRoutineDiscoveryMessage;
4544
import org.apache.ignite.internal.util.GridConcurrentHashSet;
4645
import org.apache.ignite.internal.util.typedef.P2;
@@ -1051,7 +1050,7 @@ public void testAsyncOld() throws Exception {
10511050
}
10521051
}, IllegalStateException.class, null);
10531052

1054-
lsnr.blockCustomEvent(StartRoutineDiscoveryMessage.class, StartRoutineDiscoveryMessageV2.class);
1053+
lsnr.blockCustomEvent(StartRoutineDiscoveryMessage.class);
10551054

10561055
final String topic = "topic";
10571056

@@ -1149,7 +1148,7 @@ public void testAsync() throws Exception {
11491148

11501149
discoSpi.setInternalListener(lsnr);
11511150

1152-
lsnr.blockCustomEvent(StartRoutineDiscoveryMessage.class, StartRoutineDiscoveryMessageV2.class);
1151+
lsnr.blockCustomEvent(StartRoutineDiscoveryMessage.class);
11531152

11541153
final String topic = "topic";
11551154

0 commit comments

Comments
 (0)