Skip to content

Commit a54b9f4

Browse files
authored
IGNITE-28721 Remove writeTo/readFrom methods from Message (#13177)
1 parent c0e1b38 commit a54b9f4

24 files changed

Lines changed: 493 additions & 515 deletions

modules/codegen/src/main/java/org/apache/ignite/internal/MessageProcessor.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ public class MessageProcessor extends AbstractProcessor {
7070
static final String MESSAGE_INTERFACE = "org.apache.ignite.plugin.extensions.communication.Message";
7171

7272
/** Compressed message. */
73-
static final String COMPRESSED_MESSAGE_INTERFACE = "org.apache.ignite.internal.managers.communication.CompressedMessage";
73+
static final String COMPRESSED_MESSAGE_CLASS = "org.apache.ignite.internal.managers.communication.CompressedMessage";
7474

7575
/** Externalizable message. */
7676
static final String MARSHALLABLE_MESSAGE_INTERFACE = "org.apache.ignite.internal.MarshallableMessage";
@@ -85,7 +85,7 @@ public class MessageProcessor extends AbstractProcessor {
8585
/** Messages with no fields. A serializer generation intentionally skipped. */
8686
static final String[] SKIP_MESSAGES = {
8787
"org.apache.ignite.internal.processors.odbc.ClientMessage",
88-
"org.apache.ignite.internal.managers.communication.CompressedMessage",
88+
COMPRESSED_MESSAGE_CLASS,
8989
"org.apache.ignite.loadtests.communication.GridTestMessage"
9090
};
9191

modules/codegen/src/main/java/org/apache/ignite/internal/MessageSerializerGenerator.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@
5555
import org.apache.ignite.internal.util.typedef.internal.SB;
5656
import org.jetbrains.annotations.Nullable;
5757

58-
import static org.apache.ignite.internal.MessageProcessor.COMPRESSED_MESSAGE_INTERFACE;
58+
import static org.apache.ignite.internal.MessageProcessor.COMPRESSED_MESSAGE_CLASS;
5959
import static org.apache.ignite.internal.MessageProcessor.MARSHALLABLE_MESSAGE_INTERFACE;
6060
import static org.apache.ignite.internal.MessageProcessor.MESSAGE_INTERFACE;
6161

@@ -460,7 +460,7 @@ else if (assignableFrom(type, type("org.apache.ignite.internal.util.GridLongList
460460
returnFalseIfWriteFailed(write, field, "writer.writeGridLongList", getExpr);
461461

462462
else if (assignableFrom(type, type(MESSAGE_INTERFACE))) {
463-
if (sameType(type, COMPRESSED_MESSAGE_INTERFACE))
463+
if (sameType(type, COMPRESSED_MESSAGE_CLASS))
464464
throw new IllegalArgumentException(COMPRESSED_MSG_ERROR);
465465

466466
if (compress)
@@ -686,7 +686,7 @@ else if (assignableFrom(type, type("org.apache.ignite.internal.util.GridLongList
686686
returnFalseIfReadFailed(field, "reader.readGridLongList");
687687

688688
else if (assignableFrom(type, type(MESSAGE_INTERFACE))) {
689-
if (sameType(type, COMPRESSED_MESSAGE_INTERFACE))
689+
if (sameType(type, COMPRESSED_MESSAGE_CLASS))
690690
throw new IllegalArgumentException(COMPRESSED_MSG_ERROR);
691691

692692
if (compress)
@@ -844,7 +844,7 @@ private String messageCollectionItemType(TypeMirror type) throws Exception {
844844
if (primitiveType != null)
845845
return primitiveType.getKind().toString();
846846

847-
if (sameType(type, COMPRESSED_MESSAGE_INTERFACE))
847+
if (sameType(type, COMPRESSED_MESSAGE_CLASS))
848848
throw new IllegalArgumentException(COMPRESSED_MSG_ERROR);
849849
}
850850

modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -343,8 +343,7 @@ public CoreMessagesProvider(Marshaller dfltMarsh, Marshaller schemaAwareMarsh, C
343343

344344
// [5000 - 5500]: Utility messages. Most of them originally come from Discovery.
345345
msgIdx = 5000;
346-
// We don't use the code‑generated serializer for CompressedMessage - serialization is highly customized.
347-
factory.register(msgIdx++, CompressedMessage::new);
346+
withNoSchema(CompressedMessage.class);
348347
withNoSchemaResolvedClassLoader(ErrorMessage.class);
349348
withNoSchema(InetSocketAddressMessage.class);
350349
withNoSchema(InetAddressMessage.class);

modules/core/src/main/java/org/apache/ignite/internal/managers/communication/CompressedMessage.java

Lines changed: 9 additions & 116 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,6 @@
2828
import org.apache.ignite.IgniteException;
2929
import org.apache.ignite.internal.util.typedef.internal.S;
3030
import org.apache.ignite.plugin.extensions.communication.Message;
31-
import org.apache.ignite.plugin.extensions.communication.MessageReader;
32-
import org.apache.ignite.plugin.extensions.communication.MessageWriter;
3331

3432
/**
3533
* Internal message used when transmitting fields annotated with @Compress over the network.
@@ -41,22 +39,22 @@ public class CompressedMessage implements Message {
4139
static final int CHUNK_SIZE = 10 * 1024;
4240

4341
/** Reader buffer capacity. */
44-
private static final int BUFFER_CAPACITY = 10 * CHUNK_SIZE;
42+
static final int BUFFER_CAPACITY = 10 * CHUNK_SIZE;
4543

4644
/** Temporary buffer for compressed data received over the network. */
47-
private ByteBuffer tmpBuf;
45+
ByteBuffer tmpBuf;
4846

4947
/** Raw data size. */
50-
private int dataSize;
48+
int dataSize;
5149

5250
/** Chunked byte reader. */
53-
private ChunkedByteReader chunkedReader;
51+
ChunkedByteReader chunkedReader;
5452

5553
/** Chunk. */
56-
private byte[] chunk;
54+
byte[] chunk;
5755

5856
/** Flag indicating whether this is the last chunk. */
59-
private boolean finalChunk;
57+
boolean finalChunk;
6058

6159
/** Compression level. */
6260
private int compressionLvl;
@@ -90,114 +88,9 @@ public byte[] uncompressed() {
9088
return uncompress();
9189
}
9290

93-
/** {@inheritDoc} */
94-
@SuppressWarnings("deprecation")
95-
@Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
96-
writer.setBuffer(buf);
97-
98-
if (!writer.isHeaderWritten()) {
99-
if (!writer.writeHeader(directType()))
100-
return false;
101-
102-
writer.onHeaderWritten();
103-
}
104-
105-
while (true) {
106-
if (chunk == null && chunkedReader != null) {
107-
chunk = chunkedReader.nextChunk();
108-
109-
finalChunk = (chunk == null);
110-
}
111-
112-
switch (writer.state()) {
113-
case 0:
114-
if (!writer.writeInt(dataSize))
115-
return false;
116-
117-
writer.incrementState();
118-
119-
if (dataSize == 0)
120-
return true;
121-
122-
case 1:
123-
if (!writer.writeBoolean(finalChunk))
124-
return false;
125-
126-
writer.incrementState();
127-
128-
if (finalChunk)
129-
return true;
130-
131-
case 2:
132-
if (!writer.writeByteArray(chunk))
133-
return false;
134-
135-
chunk = null;
136-
137-
writer.decrementState();
138-
}
139-
}
140-
}
141-
142-
/** {@inheritDoc} */
143-
@SuppressWarnings("deprecation")
144-
@Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
145-
reader.setBuffer(buf);
146-
147-
if (tmpBuf == null)
148-
tmpBuf = ByteBuffer.allocateDirect(BUFFER_CAPACITY);
149-
150-
assert chunk == null : chunk;
151-
152-
while (true) {
153-
switch (reader.state()) {
154-
case 0:
155-
dataSize = reader.readInt();
156-
157-
if (!reader.isLastRead())
158-
return false;
159-
160-
if (dataSize == 0)
161-
return true;
162-
163-
reader.incrementState();
164-
165-
case 1:
166-
finalChunk = reader.readBoolean();
167-
168-
if (!reader.isLastRead())
169-
return false;
170-
171-
if (finalChunk)
172-
return true;
173-
174-
reader.incrementState();
175-
176-
case 2:
177-
chunk = reader.readByteArray();
178-
179-
if (!reader.isLastRead())
180-
return false;
181-
182-
if (chunk != null) {
183-
if (tmpBuf.remaining() <= CHUNK_SIZE) {
184-
ByteBuffer newTmpBuf = ByteBuffer.allocateDirect(tmpBuf.capacity() * 2);
185-
186-
tmpBuf.flip();
187-
188-
newTmpBuf.put(tmpBuf);
189-
190-
tmpBuf = newTmpBuf;
191-
}
192-
193-
tmpBuf.put(chunk);
194-
195-
reader.decrementState();
196-
197-
chunk = null;
198-
}
199-
}
200-
}
91+
/** @return Next chunk of data or null. */
92+
public byte[] nextChunk() {
93+
return chunkedReader.nextChunk();
20194
}
20295

20396
/**
Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.ignite.internal.managers.communication;
19+
20+
import java.nio.ByteBuffer;
21+
import org.apache.ignite.plugin.extensions.communication.MessageReader;
22+
import org.apache.ignite.plugin.extensions.communication.MessageSerializer;
23+
import org.apache.ignite.plugin.extensions.communication.MessageWriter;
24+
25+
import static org.apache.ignite.internal.managers.communication.CompressedMessage.BUFFER_CAPACITY;
26+
import static org.apache.ignite.internal.managers.communication.CompressedMessage.CHUNK_SIZE;
27+
28+
/** Message serializer for compressed message. */
29+
public class CompressedMessageSerializer implements MessageSerializer<CompressedMessage> {
30+
/** {@inheritDoc} */
31+
@Override public boolean writeTo(CompressedMessage msg, MessageWriter writer) {
32+
if (!writer.isHeaderWritten()) {
33+
if (!writer.writeHeader(msg.directType()))
34+
return false;
35+
36+
writer.onHeaderWritten();
37+
}
38+
39+
while (true) {
40+
if (msg.chunk == null && msg.chunkedReader != null) {
41+
msg.chunk = msg.nextChunk();
42+
43+
msg.finalChunk = (msg.chunk == null);
44+
}
45+
46+
switch (writer.state()) {
47+
case 0:
48+
if (!writer.writeInt(msg.dataSize))
49+
return false;
50+
51+
writer.incrementState();
52+
53+
if (msg.dataSize == 0)
54+
return true;
55+
56+
case 1:
57+
if (!writer.writeBoolean(msg.finalChunk))
58+
return false;
59+
60+
writer.incrementState();
61+
62+
if (msg.finalChunk)
63+
return true;
64+
65+
case 2:
66+
if (!writer.writeByteArray(msg.chunk))
67+
return false;
68+
69+
msg.chunk = null;
70+
71+
writer.decrementState();
72+
}
73+
}
74+
}
75+
76+
/** {@inheritDoc} */
77+
@Override public boolean readFrom(CompressedMessage msg, MessageReader reader) {
78+
if (msg.tmpBuf == null)
79+
msg.tmpBuf = ByteBuffer.allocateDirect(BUFFER_CAPACITY);
80+
81+
assert msg.chunk == null : msg.chunk;
82+
83+
while (true) {
84+
switch (reader.state()) {
85+
case 0:
86+
msg.dataSize = reader.readInt();
87+
88+
if (!reader.isLastRead())
89+
return false;
90+
91+
if (msg.dataSize == 0)
92+
return true;
93+
94+
reader.incrementState();
95+
96+
case 1:
97+
msg.finalChunk = reader.readBoolean();
98+
99+
if (!reader.isLastRead())
100+
return false;
101+
102+
if (msg.finalChunk)
103+
return true;
104+
105+
reader.incrementState();
106+
107+
case 2:
108+
msg.chunk = reader.readByteArray();
109+
110+
if (!reader.isLastRead())
111+
return false;
112+
113+
if (msg.chunk != null) {
114+
if (msg.tmpBuf.remaining() <= CHUNK_SIZE) {
115+
ByteBuffer newTmpBuf = ByteBuffer.allocateDirect(msg.tmpBuf.capacity() * 2);
116+
117+
msg.tmpBuf.flip();
118+
119+
newTmpBuf.put(msg.tmpBuf);
120+
121+
msg.tmpBuf = newTmpBuf;
122+
}
123+
124+
msg.tmpBuf.put(msg.chunk);
125+
126+
reader.decrementState();
127+
128+
msg.chunk = null;
129+
}
130+
}
131+
}
132+
}
133+
}

0 commit comments

Comments
 (0)