Skip to content

Commit 8b70dc0

Browse files
authored
IGNITE-26013 Use MessageSerializer for SnapshotFiles* messages (#12308)
1 parent 18cad73 commit 8b70dc0

4 files changed

Lines changed: 54 additions & 183 deletions

File tree

modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@
5454
import org.apache.ignite.internal.codegen.NearCacheUpdatesSerializer;
5555
import org.apache.ignite.internal.codegen.SchemaOperationStatusMessageSerializer;
5656
import org.apache.ignite.internal.codegen.SessionChannelMessageSerializer;
57+
import org.apache.ignite.internal.codegen.SnapshotFilesFailureMessageSerializer;
58+
import org.apache.ignite.internal.codegen.SnapshotFilesRequestMessageSerializer;
5759
import org.apache.ignite.internal.codegen.TcpInverseConnectionResponseMessageSerializer;
5860
import org.apache.ignite.internal.codegen.TxLockSerializer;
5961
import org.apache.ignite.internal.codegen.TxLocksRequestSerializer;
@@ -359,8 +361,10 @@ public class GridIoMessageFactory implements MessageFactoryProvider {
359361
factory.register(SessionChannelMessage.TYPE_CODE, SessionChannelMessage::new, new SessionChannelMessageSerializer());
360362
factory.register(SingleNodeMessage.TYPE_CODE, SingleNodeMessage::new);
361363
factory.register((short)177, TcpInverseConnectionResponseMessage::new, new TcpInverseConnectionResponseMessageSerializer());
362-
factory.register(SnapshotFilesRequestMessage.TYPE_CODE, SnapshotFilesRequestMessage::new);
363-
factory.register(SnapshotFilesFailureMessage.TYPE_CODE, SnapshotFilesFailureMessage::new);
364+
factory.register(SnapshotFilesRequestMessage.TYPE_CODE, SnapshotFilesRequestMessage::new,
365+
new SnapshotFilesRequestMessageSerializer());
366+
factory.register(SnapshotFilesFailureMessage.TYPE_CODE, SnapshotFilesFailureMessage::new,
367+
new SnapshotFilesFailureMessageSerializer());
364368
factory.register((short)180, AtomicApplicationAttributesAwareRequest::new);
365369
factory.register((short)181, TransactionAttributesAwareRequest::new);
366370

modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotMessage.java

Lines changed: 7 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -18,18 +18,17 @@
1818

1919
package org.apache.ignite.internal.processors.cache.persistence.snapshot;
2020

21-
import java.nio.ByteBuffer;
21+
import org.apache.ignite.internal.Order;
2222
import org.apache.ignite.internal.util.typedef.internal.S;
2323
import org.apache.ignite.internal.util.typedef.internal.U;
2424
import org.apache.ignite.plugin.extensions.communication.Message;
25-
import org.apache.ignite.plugin.extensions.communication.MessageReader;
26-
import org.apache.ignite.plugin.extensions.communication.MessageWriter;
2725

2826
/**
2927
*
3028
*/
3129
abstract class AbstractSnapshotMessage implements Message {
3230
/** Unique message ID. */
31+
@Order(0)
3332
private String id;
3433

3534
/**
@@ -55,41 +54,11 @@ public String id() {
5554
return id;
5655
}
5756

58-
/** {@inheritDoc} */
59-
@Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
60-
writer.setBuffer(buf);
61-
62-
if (!writer.isHeaderWritten()) {
63-
if (!writer.writeHeader(directType()))
64-
return false;
65-
66-
writer.onHeaderWritten();
67-
}
68-
69-
if (writer.state() == 0) {
70-
if (!writer.writeString(id))
71-
return false;
72-
73-
writer.incrementState();
74-
}
75-
76-
return true;
77-
}
78-
79-
/** {@inheritDoc} */
80-
@Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
81-
reader.setBuffer(buf);
82-
83-
if (reader.state() == 0) {
84-
id = reader.readString();
85-
86-
if (!reader.isLastRead())
87-
return false;
88-
89-
reader.incrementState();
90-
}
91-
92-
return true;
57+
/**
58+
* @param id Unique message ID.
59+
*/
60+
public void id(String id) {
61+
this.id = id;
9362
}
9463

9564
/** {@inheritDoc} */

modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFilesFailureMessage.java

Lines changed: 3 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,8 @@
1818

1919
package org.apache.ignite.internal.processors.cache.persistence.snapshot;
2020

21-
import java.nio.ByteBuffer;
21+
import org.apache.ignite.internal.Order;
2222
import org.apache.ignite.internal.util.typedef.internal.S;
23-
import org.apache.ignite.plugin.extensions.communication.MessageReader;
24-
import org.apache.ignite.plugin.extensions.communication.MessageWriter;
2523

2624
/**
2725
* Message indicating a failure occurred during processing snapshot files request.
@@ -31,6 +29,7 @@ public class SnapshotFilesFailureMessage extends AbstractSnapshotMessage {
3129
public static final short TYPE_CODE = 179;
3230

3331
/** Exception message which is occurred during snapshot request processing. */
32+
@Order(value = 1, method = "errorMessage")
3433
private String errMsg;
3534

3635
/**
@@ -59,55 +58,9 @@ public String errorMessage() {
5958

6059
/**
6160
* @param errMsg Response error message.
62-
* @return {@code this} for chaining.
6361
*/
64-
public SnapshotFilesFailureMessage errorMessage(String errMsg) {
62+
public void errorMessage(String errMsg) {
6563
this.errMsg = errMsg;
66-
67-
return this;
68-
}
69-
70-
/** {@inheritDoc} */
71-
@Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
72-
writer.setBuffer(buf);
73-
74-
if (!super.writeTo(buf, writer))
75-
return false;
76-
77-
if (!writer.isHeaderWritten()) {
78-
if (!writer.writeHeader(directType()))
79-
return false;
80-
81-
writer.onHeaderWritten();
82-
}
83-
84-
if (writer.state() == 1) {
85-
if (!writer.writeString(errMsg))
86-
return false;
87-
88-
writer.incrementState();
89-
}
90-
91-
return true;
92-
}
93-
94-
/** {@inheritDoc} */
95-
@Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
96-
reader.setBuffer(buf);
97-
98-
if (!super.readFrom(buf, reader))
99-
return false;
100-
101-
if (reader.state() == 1) {
102-
errMsg = reader.readString();
103-
104-
if (!reader.isLastRead())
105-
return false;
106-
107-
reader.incrementState();
108-
}
109-
110-
return true;
11164
}
11265

11366
/** {@inheritDoc} */

modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFilesRequestMessage.java

Lines changed: 38 additions & 93 deletions
Original file line numberDiff line numberDiff line change
@@ -18,20 +18,16 @@
1818

1919
package org.apache.ignite.internal.processors.cache.persistence.snapshot;
2020

21-
import java.nio.ByteBuffer;
2221
import java.util.Arrays;
2322
import java.util.HashMap;
2423
import java.util.Map;
2524
import java.util.Set;
2625
import java.util.UUID;
2726
import java.util.stream.Collectors;
28-
import org.apache.ignite.internal.GridDirectMap;
27+
import org.apache.ignite.internal.Order;
2928
import org.apache.ignite.internal.util.typedef.F;
3029
import org.apache.ignite.internal.util.typedef.internal.S;
3130
import org.apache.ignite.internal.util.typedef.internal.U;
32-
import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
33-
import org.apache.ignite.plugin.extensions.communication.MessageReader;
34-
import org.apache.ignite.plugin.extensions.communication.MessageWriter;
3531
import org.jetbrains.annotations.Nullable;
3632

3733
/**
@@ -42,16 +38,19 @@ public class SnapshotFilesRequestMessage extends AbstractSnapshotMessage {
4238
public static final short TYPE_CODE = 178;
4339

4440
/** Snapshot operation request ID. */
41+
@Order(value = 1, method = "requestId")
4542
private UUID reqId;
4643

4744
/** Snapshot name to request. */
45+
@Order(value = 2, method = "snapshotName")
4846
private String snpName;
4947

5048
/** Snapshot directory path. */
49+
@Order(value = 3, method = "snapshotPath")
5150
private String snpPath;
5251

5352
/** Map of cache group ids and corresponding set of its partition ids. */
54-
@GridDirectMap(keyType = Integer.class, valueType = int[].class)
53+
@Order(value = 4, method = "partitions")
5554
private Map<Integer, int[]> parts;
5655

5756
/**
@@ -100,114 +99,60 @@ public Map<Integer, Set<Integer>> parts() {
10099
return res;
101100
}
102101

102+
/**
103+
* @return The demanded cache group partitions per each cache group.
104+
*/
105+
public Map<Integer, int[]> partitions() {
106+
return parts;
107+
}
108+
109+
/**
110+
* @param parts The demanded cache group partitions per each cache group.
111+
*/
112+
public void partitions(Map<Integer, int[]> parts) {
113+
this.parts = parts;
114+
}
115+
103116
/**
104117
* @return Requested snapshot name.
105118
*/
106119
public String snapshotName() {
107120
return snpName;
108121
}
109122

123+
/**
124+
* @param snpName Requested snapshot name.
125+
*/
126+
public void snapshotName(String snpName) {
127+
this.snpName = snpName;
128+
}
129+
110130
/**
111131
* @return Snapshot directory path.
112132
*/
113133
public String snapshotPath() {
114134
return snpPath;
115135
}
116136

137+
/**
138+
* @param snpPath Snapshot directory path.
139+
*/
140+
public void snapshotPath(String snpPath) {
141+
this.snpPath = snpPath;
142+
}
143+
117144
/**
118145
* @return Snapshot operation request ID.
119146
*/
120147
public UUID requestId() {
121148
return reqId;
122149
}
123150

124-
/** {@inheritDoc} */
125-
@Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
126-
writer.setBuffer(buf);
127-
128-
if (!super.writeTo(buf, writer))
129-
return false;
130-
131-
if (!writer.isHeaderWritten()) {
132-
if (!writer.writeHeader(directType()))
133-
return false;
134-
135-
writer.onHeaderWritten();
136-
}
137-
138-
switch (writer.state()) {
139-
case 1:
140-
if (!writer.writeMap(parts, MessageCollectionItemType.INT, MessageCollectionItemType.INT_ARR))
141-
return false;
142-
143-
writer.incrementState();
144-
145-
case 2:
146-
if (!writer.writeUuid(reqId))
147-
return false;
148-
149-
writer.incrementState();
150-
151-
case 3:
152-
if (!writer.writeString(snpName))
153-
return false;
154-
155-
writer.incrementState();
156-
157-
case 4:
158-
if (!writer.writeString(snpPath))
159-
return false;
160-
161-
writer.incrementState();
162-
163-
}
164-
165-
return true;
166-
}
167-
168-
/** {@inheritDoc} */
169-
@Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
170-
reader.setBuffer(buf);
171-
172-
if (!super.readFrom(buf, reader))
173-
return false;
174-
175-
switch (reader.state()) {
176-
case 1:
177-
parts = reader.readMap(MessageCollectionItemType.INT, MessageCollectionItemType.INT_ARR, false);
178-
179-
if (!reader.isLastRead())
180-
return false;
181-
182-
reader.incrementState();
183-
184-
case 2:
185-
reqId = reader.readUuid();
186-
187-
if (!reader.isLastRead())
188-
return false;
189-
190-
reader.incrementState();
191-
192-
case 3:
193-
snpName = reader.readString();
194-
195-
if (!reader.isLastRead())
196-
return false;
197-
198-
reader.incrementState();
199-
200-
case 4:
201-
snpPath = reader.readString();
202-
203-
if (!reader.isLastRead())
204-
return false;
205-
206-
reader.incrementState();
207-
208-
}
209-
210-
return true;
151+
/**
152+
* @param reqId Snapshot operation request ID.
153+
*/
154+
public void requestId(UUID reqId) {
155+
this.reqId = reqId;
211156
}
212157

213158
/** {@inheritDoc} */

0 commit comments

Comments
 (0)