Skip to content

Commit f6a8e8f

Browse files
committed
stabilize ssl implementation
1 parent b5bc8f9 commit f6a8e8f

3 files changed

Lines changed: 63 additions & 72 deletions

File tree

rlib-network/src/loadTest/java/javasabr/rlib/network/StringSslNetworkLoadTest.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import javasabr.rlib.network.impl.StringDataConnection;
2323
import javasabr.rlib.network.impl.StringDataSslConnection;
2424
import javasabr.rlib.network.packet.impl.AbstractSslNetworkPacketReader;
25+
import javasabr.rlib.network.packet.impl.AbstractSslNetworkPacketWriter;
2526
import javasabr.rlib.network.packet.impl.StringWritableNetworkPacket;
2627
import javasabr.rlib.network.server.ServerNetwork;
2728
import javasabr.rlib.network.util.NetworkUtils;
@@ -82,7 +83,7 @@ void connectAndSendMessages(
8283
int delay = random.nextInt(MAX_SEND_DELAY);
8384
ScheduledFuture<?> schedule = executor.schedule(
8485
() -> {
85-
StringWritableNetworkPacket message = newMessage(10, 100); // 10240
86+
StringWritableNetworkPacket message = newMessage(10, 10240); // 10240
8687
connection.send(message);
8788
}, delay, TimeUnit.MILLISECONDS);
8889
tasks.add(schedule);
@@ -123,6 +124,7 @@ private static class StatisticsCollector {
123124
void testServerWithMultiplyClients() {
124125
LoggerManager.enable(StringSslNetworkLoadTest.class, LoggerLevel.INFO);
125126
//LoggerManager.enable(AbstractSslNetworkPacketReader.class, LoggerLevel.DEBUG);
127+
//LoggerManager.enable(AbstractSslNetworkPacketWriter.class, LoggerLevel.DEBUG);
126128

127129
var serverConfig = SimpleServerNetworkConfig
128130
.builder()
@@ -135,8 +137,8 @@ void testServerWithMultiplyClients() {
135137
var serverAllocator = new DefaultBufferAllocator(serverConfig);
136138
ScheduledExecutorService scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
137139

138-
int clientCount = 1;
139-
int messagesPerIteration = 20;
140+
int clientCount = 100;
141+
int messagesPerIteration = 2000;
140142
int expectedMessages = clientCount * messagesPerIteration * MAX_ITERATIONS;
141143

142144
var finalWaiter = new CountDownLatch(2);
@@ -154,7 +156,7 @@ void testServerWithMultiplyClients() {
154156
statistics
155157
.receivedClientPackersPerSecond()
156158
.accumulate(1);
157-
//connection.send(new StringWritableNetworkPacket("Echo: " + packet.data()));
159+
connection.send(new StringWritableNetworkPacket("Echo: " + packet.data()));
158160
statistics
159161
.sentEchoPackersPerSecond()
160162
.accumulate(1);

rlib-network/src/main/java/javasabr/rlib/network/packet/impl/AbstractSslNetworkPacketReader.java

Lines changed: 25 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,8 @@ protected int readPackets(ByteBuffer readingBuffer) {
9292
}
9393

9494
protected ByteBuffer moveDataToNetworkBuffer(ByteBuffer readingBuffer) {
95+
log.debug(remoteAddress(), readingBuffer,
96+
(address, buf) -> "[%s] Append new part of received data:\n%s".formatted(address, hexDump(buf)));
9597
ByteBuffer sslNetworkBuffer = sslNetworkBuffer();
9698
int availableSpace = sslNetworkBuffer.capacity() - sslNetworkBuffer.limit();
9799
if (availableSpace >= readingBuffer.limit()) {
@@ -100,16 +102,18 @@ protected ByteBuffer moveDataToNetworkBuffer(ByteBuffer readingBuffer) {
100102
sslNetworkBuffer = increaseNetworkBuffer(readingBuffer.limit());
101103
BufferUtils.appendAndClear(sslNetworkBuffer, readingBuffer);
102104
}
105+
log.debug(remoteAddress(), sslNetworkBuffer,
106+
(address, buf) -> "[%s] Result pending received network data:\n%s".formatted(address, hexDump(buf)));
103107
return sslNetworkBuffer;
104108
}
105109

106110
protected int doHandshake(ByteBuffer networkBuffer, int receivedBytes) {
107111
HandshakeStatus handshakeStatus = sslEngine.getHandshakeStatus();
108112
while (SslUtils.needToProcess(handshakeStatus)) {
109-
log.debug(handshakeStatus, "Do handshake with status:[%s] "::formatted);
110-
SSLEngineResult result;
113+
log.debug(remoteAddress(), handshakeStatus, "[%s] Do handshake with status:[%s] "::formatted);
111114
switch (handshakeStatus) {
112115
case NEED_UNWRAP: {
116+
SSLEngineResult result;
113117
if (receivedBytes == -1) {
114118
if (sslEngine.isInboundDone() && sslEngine.isOutboundDone()) {
115119
return SKIP_READ_PACKETS;
@@ -128,10 +132,11 @@ protected int doHandshake(ByteBuffer networkBuffer, int receivedBytes) {
128132
return SKIP_READ_PACKETS;
129133
}
130134
try {
131-
log.debug(networkBuffer, buff -> "Try to unwrap data:\n" + hexDump(buff));
135+
log.debug(remoteAddress(), networkBuffer,
136+
(address, buff) -> "[%s] Try to unwrap data:\n%s".formatted(address, hexDump(buff)));
132137
result = sslEngine.unwrap(networkBuffer, EMPTY_BUFFERS);
133138
handshakeStatus = result.getHandshakeStatus();
134-
log.debug(handshakeStatus, "Handshake status:[%s] after unwrapping"::formatted);
139+
log.debug(remoteAddress(), handshakeStatus, "[%s] Handshake status:[%s] after unwrapping"::formatted);
135140
} catch (SSLException sslException) {
136141
log.error("A problem was encountered while processing the data that caused the "
137142
+ "SSLEngine to abort. Will try to properly close connection...");
@@ -147,9 +152,13 @@ protected int doHandshake(ByteBuffer networkBuffer, int receivedBytes) {
147152
throw new IllegalStateException("Unexpected ssl engine result");
148153
}
149154
case BUFFER_UNDERFLOW: {
150-
log.debug("Increase ssl network buffer");
151-
increaseNetworkBuffer(0);
152-
break;
155+
log.debug(remoteAddress(), "[%s] Wait for more received data..."::formatted);
156+
if (networkBuffer.position() > 0) {
157+
networkBuffer
158+
.compact()
159+
.limit(networkBuffer.position());
160+
}
161+
return SKIP_READ_PACKETS;
153162
}
154163
case CLOSED: {
155164
if (sslEngine.isOutboundDone()) {
@@ -174,7 +183,7 @@ protected int doHandshake(ByteBuffer networkBuffer, int receivedBytes) {
174183
}
175184
case NEED_TASK: {
176185
handshakeStatus = SslUtils.executeSslTasks(sslEngine);
177-
log.debug(handshakeStatus, "Handshake status:[%s] after engine tasks"::formatted);
186+
log.debug(remoteAddress(), handshakeStatus, "[%s] Handshake status:[%s] after engine tasks"::formatted);
178187
if (handshakeStatus == HandshakeStatus.NEED_UNWRAP && !networkBuffer.hasRemaining()) {
179188
cleanNetworkBuffer(networkBuffer);
180189
return SKIP_READ_PACKETS;
@@ -204,23 +213,27 @@ protected int decryptAndRead(ByteBuffer receivedBuffer) {
204213
while (receivedBuffer.hasRemaining()) {
205214
SSLEngineResult result;
206215
try {
207-
log.debug(receivedBuffer, buf -> "Try to decrypt data:\n" + hexDump(buf));
216+
log.debug(remoteAddress(), receivedBuffer,
217+
(address, buf) -> "[%s] Try to decrypt data:\n%s".formatted(address, hexDump(buf)));
208218
result = sslEngine.unwrap(receivedBuffer, sslDataBuffer.clear());
209219
} catch (SSLException e) {
210220
throw new IllegalStateException(e);
211221
}
212222
switch (result.getStatus()) {
213223
case OK: {
214224
sslDataBuffer.flip();
215-
log.debug(sslDataBuffer, buf -> "Decrypted data:\n" + hexDump(buf));
225+
log.debug(remoteAddress(), sslDataBuffer,
226+
(address, buf) -> "[%s] Decrypted data:\n%s".formatted(address, hexDump(buf)));
216227
total += readPackets(sslDataBuffer, sslDataPendingBuffer);
217228
break;
218229
}
219230
case BUFFER_OVERFLOW: {
231+
log.debug(remoteAddress(), "Increase SSL data buffer and try again..."::formatted);
220232
increaseDataBuffer();
221233
return decryptAndRead(receivedBuffer);
222234
}
223235
case BUFFER_UNDERFLOW: {
236+
log.debug(remoteAddress(), "[%s] Wait for more received data..."::formatted);
224237
if (receivedBuffer.position() > 0) {
225238
receivedBuffer
226239
.compact()
@@ -233,16 +246,13 @@ protected int decryptAndRead(ByteBuffer receivedBuffer) {
233246
return SKIP_READ_PACKETS;
234247
}
235248
default: {
236-
if (receivedBuffer.position() > 0) {
237-
receivedBuffer.compact();
238-
return total;
239-
}
240249
throw new IllegalStateException("Invalid SSL status: " + result.getStatus());
241250
}
242251
}
243252
}
244253

245-
receivedBuffer.clear();
254+
log.debug(remoteAddress(), "[%s] Clear SSL network buffer"::formatted);
255+
cleanNetworkBuffer(receivedBuffer);
246256
return total;
247257
}
248258

rlib-network/src/main/java/javasabr/rlib/network/packet/impl/AbstractSslNetworkPacketWriter.java

Lines changed: 32 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ protected boolean tryToSendNextPacketImpl() {
8585
return super.tryToSendNextPacketImpl();
8686
}
8787

88-
ByteBuffer dataToSend = doHandshake(SslWritableNetworkPacket.getInstance());
88+
ByteBuffer dataToSend = doHandshake(status);
8989
if (dataToSend != null) {
9090
return writeBuffer(dataToSend, null);
9191
}
@@ -95,49 +95,38 @@ protected boolean tryToSendNextPacketImpl() {
9595

9696
@Override
9797
protected ByteBuffer serialize(WritableNetworkPacket packet) {
98-
HandshakeStatus status = sslEngine.getHandshakeStatus();
99-
if (SslUtils.isReadyToCrypt(status)) {
100-
if (packet instanceof SslWritableNetworkPacket) {
101-
return EMPTY_BUFFER;
102-
}
103-
104-
ByteBuffer serialized = super.serialize(packet);
98+
if (packet instanceof SslWritableNetworkPacket) {
99+
return EMPTY_BUFFER;
100+
}
105101

106-
log.debug(remoteAddress(), serialized, (address, buff) ->
107-
"[%s] Try to encrypt data:\n%s".formatted(address, hexDump(buff)));
102+
ByteBuffer serialized = super.serialize(packet);
103+
log.debug(remoteAddress(), serialized, (address, buff) ->
104+
"[%s] Try to encrypt data:\n%s".formatted(address, hexDump(buff)));
108105

109-
ByteBuffer bufferToSend = sslNetworkBuffer();
110-
SSLEngineResult result = tryEncrypt(serialized, bufferToSend);
106+
ByteBuffer bufferToSend = sslNetworkBuffer();
107+
SSLEngineResult result = tryEncrypt(serialized, bufferToSend);
111108

112-
if (result.getStatus() == SSLEngineResult.Status.OK && serialized.hasRemaining()) {
113-
int tempBufferSize = (int) ((bufferToSend.limit() + serialized.remaining()) * 1.2);
114-
ByteBuffer tempBuffer = bufferAllocator.takeBuffer(tempBufferSize);
115-
tempBuffer.put(bufferToSend.flip());
116-
while (serialized.hasRemaining()) {
117-
result = tryEncrypt(serialized, bufferToSend);
118-
if (result.getStatus() != SSLEngineResult.Status.OK) {
119-
break;
120-
}
121-
tempBuffer.put(bufferToSend.flip());
109+
if (result.getStatus() == SSLEngineResult.Status.OK && serialized.hasRemaining()) {
110+
int tempBufferSize = (int) ((bufferToSend.limit() + serialized.remaining()) * 1.2);
111+
ByteBuffer tempBuffer = bufferAllocator.takeBuffer(tempBufferSize);
112+
tempBuffer.put(bufferToSend.flip());
113+
while (serialized.hasRemaining()) {
114+
result = tryEncrypt(serialized, bufferToSend);
115+
if (result.getStatus() != SSLEngineResult.Status.OK) {
116+
break;
122117
}
123-
bufferToSend = tempBuffer;
124-
this.sslTempBuffer = tempBuffer;
118+
tempBuffer.put(bufferToSend.flip());
125119
}
126-
127-
return switch (result.getStatus()) {
128-
case BUFFER_UNDERFLOW -> increaseAndTryAgain(packet);
129-
case BUFFER_OVERFLOW -> throw new IllegalStateException("Unexpected ssl engine result");
130-
case OK -> bufferToSend.flip();
131-
case CLOSED -> closeAndReturn();
132-
};
133-
}
134-
135-
ByteBuffer dataToSend = doHandshake(packet);
136-
if (dataToSend != null) {
137-
return dataToSend;
120+
bufferToSend = tempBuffer;
121+
this.sslTempBuffer = tempBuffer;
138122
}
139123

140-
throw new IllegalStateException();
124+
return switch (result.getStatus()) {
125+
case BUFFER_UNDERFLOW -> increaseAndTryAgain(packet);
126+
case BUFFER_OVERFLOW -> throw new IllegalStateException("Unexpected ssl engine result");
127+
case OK -> bufferToSend.flip();
128+
case CLOSED -> closeAndReturn();
129+
};
141130
}
142131

143132
protected ByteBuffer closeAndReturn() {
@@ -161,17 +150,12 @@ protected SSLEngineResult tryEncrypt(ByteBuffer source, ByteBuffer destination)
161150
}
162151

163152
@Nullable
164-
protected ByteBuffer doHandshake(WritableNetworkPacket packet) {
165-
if (!(packet instanceof SslWritableNetworkPacket)) {
166-
log.debug(remoteAddress(), packet, "[%s] Return packet:[%s] to queue as first"::formatted);
167-
queueAtFirst.accept(packet);
168-
}
169-
170-
HandshakeStatus handshakeStatus = sslEngine.getHandshakeStatus();
153+
protected ByteBuffer doHandshake(HandshakeStatus handshakeStatus) {
171154
while (SslUtils.needToProcess(handshakeStatus)) {
172-
SSLEngineResult result;
155+
log.debug(remoteAddress(), handshakeStatus, "[%s] Do handshake with status:[%s] "::formatted);
173156
switch (handshakeStatus) {
174157
case NEED_WRAP: {
158+
SSLEngineResult result;
175159
try {
176160
result = sslEngine.wrap(EMPTY_BUFFERS, sslNetworkBuffer.clear());
177161
handshakeStatus = result.getHandshakeStatus();
@@ -182,16 +166,11 @@ protected ByteBuffer doHandshake(WritableNetworkPacket packet) {
182166
handshakeStatus = sslEngine.getHandshakeStatus();
183167
break;
184168
}
185-
186169
switch (result.getStatus()) {
187170
case OK:
188-
sslNetworkBuffer.flip();
189-
/* if (handshakeStatus == HandshakeStatus.NEED_WRAP) {
190-
log.debug("Send command to wrap data again");
191-
queueAtFirst.accept(SslWritableNetworkPacket.getInstance());
192-
}*/
193-
log.debug(sslNetworkBuffer, result, (buf, res) -> "Send wrapped data:\n" + hexDump(buf, res));
194-
return sslNetworkBuffer;
171+
log.debug(remoteAddress(), sslNetworkBuffer, result,
172+
(address, buf, res) -> "[%s] Send wrapped data:\n%s".formatted(address, hexDump(buf, res)));
173+
return sslNetworkBuffer.flip();
195174
case BUFFER_OVERFLOW: {
196175
log.debug(remoteAddress(), "[%s] Increase network buffer"::formatted);
197176
increaseNetworkBuffer();

0 commit comments

Comments
 (0)