Skip to content

Commit 6015fa8

Browse files
committed
continue fix ssl network implementation
1 parent 412db3a commit 6015fa8

9 files changed

Lines changed: 150 additions & 113 deletions

File tree

rlib-network/src/loadTest/java/javasabr/rlib/network/StringSslNetworkLoadTest.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ void connectAndSendMessages(
8282
int delay = random.nextInt(MAX_SEND_DELAY);
8383
ScheduledFuture<?> schedule = executor.schedule(
8484
() -> {
85-
StringWritableNetworkPacket message = newMessage(10, 10240); // 10240
85+
StringWritableNetworkPacket message = newMessage(12000, 15000); // 10240
8686
connection.send(message);
8787
}, delay, TimeUnit.MILLISECONDS);
8888
tasks.add(schedule);
@@ -122,7 +122,7 @@ private static class StatisticsCollector {
122122
@SneakyThrows
123123
void testServerWithMultiplyClients() {
124124
LoggerManager.enable(StringSslNetworkLoadTest.class, LoggerLevel.INFO);
125-
LoggerManager.enable(AbstractSslNetworkPacketReader.class, LoggerLevel.DEBUG);
125+
//LoggerManager.enable(AbstractSslNetworkPacketReader.class, LoggerLevel.DEBUG);
126126

127127
var serverConfig = SimpleServerNetworkConfig
128128
.builder()
@@ -136,7 +136,7 @@ void testServerWithMultiplyClients() {
136136
ScheduledExecutorService scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
137137

138138
int clientCount = 1;
139-
int messagesPerIteration = 300;
139+
int messagesPerIteration = 20;
140140
int expectedMessages = clientCount * messagesPerIteration * MAX_ITERATIONS;
141141

142142
var finalWaiter = new CountDownLatch(2);
@@ -214,6 +214,8 @@ private static void initReceivedMessagesTracker(
214214
}
215215

216216
private static StringWritableNetworkPacket newMessage(int minMessageLength, int maxMessageLength) {
217-
return new StringWritableNetworkPacket(StringUtils.generate(minMessageLength, maxMessageLength));
217+
String generate = StringUtils.generate(minMessageLength, maxMessageLength);
218+
return new StringWritableNetworkPacket("a".repeat(generate.length()));
219+
//return new StringWritableNetworkPacket(StringUtils.generate(minMessageLength, maxMessageLength));
218220
}
219221
}

rlib-network/src/main/java/javasabr/rlib/network/BufferAllocator.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package javasabr.rlib.network;
22

33
import java.nio.ByteBuffer;
4+
import org.jspecify.annotations.Nullable;
45

56
/**
67
* The interface to implement a buffer allocator for network things.

rlib-network/src/main/java/javasabr/rlib/network/packet/impl/AbstractNetworkPacketWriter.java

Lines changed: 24 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import static javasabr.rlib.network.util.NetworkUtils.hexDump;
55

66
import java.io.IOException;
7+
import java.nio.BufferOverflowException;
78
import java.nio.ByteBuffer;
89
import java.nio.channels.AsynchronousSocketChannel;
910
import java.nio.channels.CompletionHandler;
@@ -116,7 +117,6 @@ public boolean tryToSendNextPacket() {
116117
}
117118

118119
serializedToChannelPacketHandler.accept(nextPacket);
119-
120120
return startedWriting;
121121
}
122122

@@ -143,9 +143,25 @@ protected ByteBuffer serialize(WritableNetworkPacket packet) {
143143
ByteBuffer second = bufferAllocator.takeBuffer(totalSize);
144144
firstWriteTempBuffer = first;
145145
secondWriteTempBuffer = second;
146-
return serialize(resultPacket, expectedLength, totalSize, first, second);
146+
try {
147+
return serialize(resultPacket, expectedLength, totalSize, first, second);
148+
} catch (BufferOverflowException ex) {
149+
log.error(ex);
150+
bufferAllocator.putBuffer(first);
151+
bufferAllocator.putBuffer(second);
152+
firstWriteTempBuffer = null;
153+
secondWriteTempBuffer = null;
154+
throw new RuntimeException(ex);
155+
}
147156
} else {
148-
return serialize(resultPacket, expectedLength, totalSize, firstWriteBuffer, secondWriteBuffer);
157+
try {
158+
return serialize(resultPacket, expectedLength, totalSize, firstWriteBuffer, secondWriteBuffer);
159+
} catch (BufferOverflowException ex) {
160+
log.error(ex);
161+
firstWriteBuffer.clear();
162+
secondWriteBuffer.clear();
163+
throw new RuntimeException(ex);
164+
}
149165
}
150166
}
151167

@@ -359,29 +375,25 @@ protected void handleFailedWritingData(Throwable exception, WritableNetworkPacke
359375

360376
@Override
361377
public void close() {
362-
363378
bufferAllocator
364379
.putWriteBuffer(firstWriteBuffer)
365380
.putWriteBuffer(secondWriteBuffer);
366-
367381
clearTempBuffers();
368-
369382
writingBuffer = EMPTY_BUFFER;
370383
}
371384

372385
protected void clearTempBuffers() {
373386

374-
var secondWriteTempBuffer = this.secondWriteTempBuffer;
375387
var firstWriteTempBuffer = this.firstWriteTempBuffer;
388+
if (firstWriteTempBuffer != null) {
389+
this.firstWriteTempBuffer = null;
390+
bufferAllocator.putBuffer(firstWriteTempBuffer);
391+
}
376392

393+
var secondWriteTempBuffer = this.secondWriteTempBuffer;
377394
if (secondWriteTempBuffer != null) {
378395
this.secondWriteTempBuffer = null;
379396
bufferAllocator.putBuffer(secondWriteTempBuffer);
380397
}
381-
382-
if (firstWriteTempBuffer != null) {
383-
this.firstWriteTempBuffer = null;
384-
bufferAllocator.putBuffer(firstWriteTempBuffer);
385-
}
386398
}
387399
}

rlib-network/src/main/java/javasabr/rlib/network/packet/impl/AbstractSslNetworkPacketReader.java

Lines changed: 23 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -84,17 +84,17 @@ protected void handleReceivedData(int receivedBytes, ByteBuffer readingBuffer) {
8484
protected int readPackets(ByteBuffer readingBuffer) {
8585
ByteBuffer networkBuffer = moveDataToNetworkBuffer(readingBuffer);
8686
HandshakeStatus handshakeStatus = sslEngine.getHandshakeStatus();
87-
if (SslUtils.isReadyToDecrypt(handshakeStatus)) {
87+
if (SslUtils.isReadyToCrypt(handshakeStatus)) {
8888
return decryptAndRead(networkBuffer);
8989
} else {
9090
return doHandshake(networkBuffer, networkBuffer.limit());
9191
}
9292
}
9393

94-
private ByteBuffer moveDataToNetworkBuffer(ByteBuffer readingBuffer) {
94+
protected ByteBuffer moveDataToNetworkBuffer(ByteBuffer readingBuffer) {
9595
ByteBuffer sslNetworkBuffer = sslNetworkBuffer();
96-
int freeSpace = sslNetworkBuffer.capacity() - sslNetworkBuffer.limit();
97-
if (freeSpace >= readingBuffer.limit()) {
96+
int availableSpace = sslNetworkBuffer.capacity() - sslNetworkBuffer.limit();
97+
if (availableSpace >= readingBuffer.limit()) {
9898
BufferUtils.appendAndClear(sslNetworkBuffer, readingBuffer);
9999
} else {
100100
sslNetworkBuffer = increaseNetworkBuffer(readingBuffer.limit());
@@ -103,7 +103,7 @@ private ByteBuffer moveDataToNetworkBuffer(ByteBuffer readingBuffer) {
103103
return sslNetworkBuffer;
104104
}
105105

106-
protected int doHandshake(ByteBuffer receivedBuffer, int receivedBytes) {
106+
protected int doHandshake(ByteBuffer networkBuffer, int receivedBytes) {
107107
HandshakeStatus handshakeStatus = sslEngine.getHandshakeStatus();
108108
while (SslUtils.needToProcess(handshakeStatus)) {
109109
log.debug(handshakeStatus, "Do handshake with status:[%s] "::formatted);
@@ -123,13 +123,13 @@ protected int doHandshake(ByteBuffer receivedBuffer, int receivedBytes) {
123123
sslEngine.closeOutbound();
124124
handshakeStatus = sslEngine.getHandshakeStatus();
125125
break;
126-
} else if (!receivedBuffer.hasRemaining()) {
127-
receivedBuffer.clear().limit(0);
126+
} else if (!networkBuffer.hasRemaining()) {
127+
cleanNetworkBuffer(networkBuffer);
128128
return SKIP_READ_PACKETS;
129129
}
130130
try {
131-
log.debug(receivedBuffer, buff -> "Try to unwrap data:\n" + hexDump(buff));
132-
result = sslEngine.unwrap(receivedBuffer, EMPTY_BUFFERS);
131+
log.debug(networkBuffer, buff -> "Try to unwrap data:\n" + hexDump(buff));
132+
result = sslEngine.unwrap(networkBuffer, EMPTY_BUFFERS);
133133
handshakeStatus = result.getHandshakeStatus();
134134
log.debug(handshakeStatus, "Handshake status:[%s] after unwrapping"::formatted);
135135
} catch (SSLException sslException) {
@@ -167,21 +167,16 @@ protected int doHandshake(ByteBuffer receivedBuffer, int receivedBytes) {
167167
break;
168168
}
169169
case NEED_WRAP: {
170-
log.debug("Send command to wrap data");
170+
log.debug(remoteAddress(), "[%s] Send command to wrap data"::formatted);
171171
packetWriter.accept(SslWritableNetworkPacket.getInstance());
172-
receivedBuffer.clear().limit(0);
172+
cleanNetworkBuffer(networkBuffer);
173173
return SKIP_READ_PACKETS;
174174
}
175175
case NEED_TASK: {
176-
Runnable task;
177-
while ((task = sslEngine.getDelegatedTask()) != null) {
178-
log.debug(task, "Execute SSL Engine's task:[%s]"::formatted);
179-
task.run();
180-
}
181-
handshakeStatus = sslEngine.getHandshakeStatus();
176+
handshakeStatus = SslUtils.executeSslTasks(sslEngine);
182177
log.debug(handshakeStatus, "Handshake status:[%s] after engine tasks"::formatted);
183-
if (handshakeStatus == HandshakeStatus.NEED_UNWRAP && !receivedBuffer.hasRemaining()) {
184-
receivedBuffer.clear().limit(0);
178+
if (handshakeStatus == HandshakeStatus.NEED_UNWRAP && !networkBuffer.hasRemaining()) {
179+
cleanNetworkBuffer(networkBuffer);
185180
return SKIP_READ_PACKETS;
186181
}
187182
break;
@@ -192,16 +187,16 @@ protected int doHandshake(ByteBuffer receivedBuffer, int receivedBytes) {
192187
}
193188
}
194189

195-
if (!receivedBuffer.hasRemaining()) {
190+
if (!networkBuffer.hasRemaining()) {
196191
// if buffer is empty and status is FINISHED then we can notify writer
197192
if (handshakeStatus == HandshakeStatus.FINISHED) {
198193
packetWriter.accept(SslWritableNetworkPacket.getInstance());
199194
}
200-
receivedBuffer.clear().limit(0);
195+
cleanNetworkBuffer(networkBuffer);
201196
return SKIP_READ_PACKETS;
202197
}
203198

204-
return decryptAndRead(receivedBuffer);
199+
return decryptAndRead(networkBuffer);
205200
}
206201

207202
protected int decryptAndRead(ByteBuffer receivedBuffer) {
@@ -251,7 +246,7 @@ protected int decryptAndRead(ByteBuffer receivedBuffer) {
251246
return total;
252247
}
253248

254-
private synchronized ByteBuffer increaseNetworkBuffer(int extra) {
249+
protected synchronized ByteBuffer increaseNetworkBuffer(int extra) {
255250
ByteBuffer current = sslNetworkBuffer();
256251
int newSize = (int) Math.max(current.capacity() * 1.3, current.capacity() + extra);
257252
sslNetworkBuffer = NetworkUtils
@@ -260,7 +255,7 @@ private synchronized ByteBuffer increaseNetworkBuffer(int extra) {
260255
return sslNetworkBuffer;
261256
}
262257

263-
private synchronized void increaseDataBuffer() {
258+
protected synchronized void increaseDataBuffer() {
264259
int newSize = sslEngine
265260
.getSession()
266261
.getApplicationBufferSize();
@@ -281,4 +276,8 @@ public void close() {
281276
.putBuffer(sslNetworkBuffer);
282277
super.close();
283278
}
279+
280+
protected static void cleanNetworkBuffer(ByteBuffer networkBuffer) {
281+
networkBuffer.clear().limit(0);
282+
}
284283
}

0 commit comments

Comments
 (0)