Skip to content

Commit 115a680

Browse files
committed
finish updating ssl network implementation
1 parent f6a8e8f commit 115a680

9 files changed

Lines changed: 145 additions & 154 deletions

File tree

rlib-network/src/loadTest/java/javasabr/rlib/network/StringSslNetworkLoadTest.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -198,7 +198,7 @@ private static void initReceivedMessagesTracker(
198198
.getThenReset();
199199
log.info(received, "Server receiving [%s] messages/sec"::formatted);
200200
if (received == 0) {
201-
// finalWaiter.countDown();
201+
finalWaiter.countDown();
202202
}
203203
}, 1, 1, TimeUnit.SECONDS);
204204
scheduledExecutor.scheduleAtFixedRate(() -> {
@@ -216,8 +216,6 @@ private static void initReceivedMessagesTracker(
216216
}
217217

218218
private static StringWritableNetworkPacket newMessage(int minMessageLength, int maxMessageLength) {
219-
String generate = StringUtils.generate(minMessageLength, maxMessageLength);
220-
return new StringWritableNetworkPacket("a".repeat(generate.length()));
221-
//return new StringWritableNetworkPacket(StringUtils.generate(minMessageLength, maxMessageLength));
219+
return new StringWritableNetworkPacket(StringUtils.generate(minMessageLength, maxMessageLength));
222220
}
223221
}

rlib-network/src/main/java/javasabr/rlib/network/packet/impl/AbstractSslNetworkPacketReader.java

Lines changed: 67 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -91,26 +91,11 @@ protected int readPackets(ByteBuffer readingBuffer) {
9191
}
9292
}
9393

94-
protected ByteBuffer moveDataToNetworkBuffer(ByteBuffer readingBuffer) {
95-
log.debug(remoteAddress(), readingBuffer,
96-
(address, buf) -> "[%s] Append new part of received data:\n%s".formatted(address, hexDump(buf)));
97-
ByteBuffer sslNetworkBuffer = sslNetworkBuffer();
98-
int availableSpace = sslNetworkBuffer.capacity() - sslNetworkBuffer.limit();
99-
if (availableSpace >= readingBuffer.limit()) {
100-
BufferUtils.appendAndClear(sslNetworkBuffer, readingBuffer);
101-
} else {
102-
sslNetworkBuffer = increaseNetworkBuffer(readingBuffer.limit());
103-
BufferUtils.appendAndClear(sslNetworkBuffer, readingBuffer);
104-
}
105-
log.debug(remoteAddress(), sslNetworkBuffer,
106-
(address, buf) -> "[%s] Result pending received network data:\n%s".formatted(address, hexDump(buf)));
107-
return sslNetworkBuffer;
108-
}
109-
11094
protected int doHandshake(ByteBuffer networkBuffer, int receivedBytes) {
11195
HandshakeStatus handshakeStatus = sslEngine.getHandshakeStatus();
96+
String remoteAddress = remoteAddress();
11297
while (SslUtils.needToProcess(handshakeStatus)) {
113-
log.debug(remoteAddress(), handshakeStatus, "[%s] Do handshake with status:[%s] "::formatted);
98+
log.debug(remoteAddress, handshakeStatus, "[%s] Do handshake with status:[%s] "::formatted);
11499
switch (handshakeStatus) {
115100
case NEED_UNWRAP: {
116101
SSLEngineResult result;
@@ -128,15 +113,14 @@ protected int doHandshake(ByteBuffer networkBuffer, int receivedBytes) {
128113
handshakeStatus = sslEngine.getHandshakeStatus();
129114
break;
130115
} else if (!networkBuffer.hasRemaining()) {
131-
cleanNetworkBuffer(networkBuffer);
116+
NetworkUtils.cleanNetworkBuffer(networkBuffer);
132117
return SKIP_READ_PACKETS;
133118
}
134119
try {
135-
log.debug(remoteAddress(), networkBuffer,
136-
(address, buff) -> "[%s] Try to unwrap data:\n%s".formatted(address, hexDump(buff)));
120+
logDataBeforeUnwrap(remoteAddress, networkBuffer);
137121
result = sslEngine.unwrap(networkBuffer, EMPTY_BUFFERS);
138122
handshakeStatus = result.getHandshakeStatus();
139-
log.debug(remoteAddress(), handshakeStatus, "[%s] Handshake status:[%s] after unwrapping"::formatted);
123+
log.debug(remoteAddress, handshakeStatus, "[%s] Handshake status:[%s] after unwrapping"::formatted);
140124
} catch (SSLException sslException) {
141125
log.error("A problem was encountered while processing the data that caused the "
142126
+ "SSLEngine to abort. Will try to properly close connection...");
@@ -149,15 +133,11 @@ protected int doHandshake(ByteBuffer networkBuffer, int receivedBytes) {
149133
break;
150134
}
151135
case BUFFER_OVERFLOW: {
152-
throw new IllegalStateException("Unexpected ssl engine result");
136+
throw new IllegalStateException("Unexpected SSL Engine result:" + result.getStatus());
153137
}
154138
case BUFFER_UNDERFLOW: {
155-
log.debug(remoteAddress(), "[%s] Wait for more received data..."::formatted);
156-
if (networkBuffer.position() > 0) {
157-
networkBuffer
158-
.compact()
159-
.limit(networkBuffer.position());
160-
}
139+
log.debug(remoteAddress, "[%s] Wait for more received data..."::formatted);
140+
NetworkUtils.compactNetworkBufferIfNeed(networkBuffer);
161141
return SKIP_READ_PACKETS;
162142
}
163143
case CLOSED: {
@@ -170,92 +150,105 @@ protected int doHandshake(ByteBuffer networkBuffer, int receivedBytes) {
170150
}
171151
}
172152
default: {
173-
throw new IllegalStateException("Invalid SSL status: " + result.getStatus());
153+
throw new IllegalStateException("Invalid SSL Engine result:" + result.getStatus());
174154
}
175155
}
176156
break;
177157
}
178158
case NEED_WRAP: {
179-
log.debug(remoteAddress(), "[%s] Send command to wrap data"::formatted);
180-
packetWriter.accept(SslWritableNetworkPacket.getInstance());
181-
cleanNetworkBuffer(networkBuffer);
159+
log.debug(remoteAddress, "[%s] Send command to wrap data"::formatted);
160+
packetWriter.accept(SslWrapRequestPacket.getInstance());
161+
NetworkUtils.cleanNetworkBuffer(networkBuffer);
182162
return SKIP_READ_PACKETS;
183163
}
184164
case NEED_TASK: {
185165
handshakeStatus = SslUtils.executeSslTasks(sslEngine);
186-
log.debug(remoteAddress(), handshakeStatus, "[%s] Handshake status:[%s] after engine tasks"::formatted);
166+
log.debug(remoteAddress, handshakeStatus, "[%s] Handshake status:[%s] after engine tasks"::formatted);
187167
if (handshakeStatus == HandshakeStatus.NEED_UNWRAP && !networkBuffer.hasRemaining()) {
188-
cleanNetworkBuffer(networkBuffer);
168+
NetworkUtils.cleanNetworkBuffer(networkBuffer);
189169
return SKIP_READ_PACKETS;
190170
}
191171
break;
192172
}
193173
default: {
194-
throw new IllegalStateException("Invalid SSL status: " + handshakeStatus);
174+
throw new IllegalStateException("Invalid SSL status:" + handshakeStatus);
195175
}
196176
}
197177
}
198178

199179
if (!networkBuffer.hasRemaining()) {
200180
// if buffer is empty and status is FINISHED then we can notify writer
201181
if (handshakeStatus == HandshakeStatus.FINISHED) {
202-
packetWriter.accept(SslWritableNetworkPacket.getInstance());
182+
packetWriter.accept(SslWrapRequestPacket.getInstance());
203183
}
204-
cleanNetworkBuffer(networkBuffer);
184+
NetworkUtils.cleanNetworkBuffer(networkBuffer);
205185
return SKIP_READ_PACKETS;
206186
}
207187

208188
return decryptAndRead(networkBuffer);
209189
}
210190

211191
protected int decryptAndRead(ByteBuffer receivedBuffer) {
192+
String remoteAddress = remoteAddress();
212193
int total = 0;
213194
while (receivedBuffer.hasRemaining()) {
195+
ByteBuffer sslDataBuffer = sslDataBuffer();
214196
SSLEngineResult result;
215197
try {
216-
log.debug(remoteAddress(), receivedBuffer,
217-
(address, buf) -> "[%s] Try to decrypt data:\n%s".formatted(address, hexDump(buf)));
198+
logDataBeforeDecrypt(remoteAddress, receivedBuffer);
218199
result = sslEngine.unwrap(receivedBuffer, sslDataBuffer.clear());
219200
} catch (SSLException e) {
220201
throw new IllegalStateException(e);
221202
}
222203
switch (result.getStatus()) {
223204
case OK: {
224205
sslDataBuffer.flip();
225-
log.debug(remoteAddress(), sslDataBuffer,
226-
(address, buf) -> "[%s] Decrypted data:\n%s".formatted(address, hexDump(buf)));
206+
logDataAfterDecrypt(remoteAddress, sslDataBuffer);
227207
total += readPackets(sslDataBuffer, sslDataPendingBuffer);
228208
break;
229209
}
230210
case BUFFER_OVERFLOW: {
231-
log.debug(remoteAddress(), "Increase SSL data buffer and try again..."::formatted);
211+
log.debug(remoteAddress, "[%s] Increase SSL data buffer and try again..."::formatted);
232212
increaseDataBuffer();
233213
return decryptAndRead(receivedBuffer);
234214
}
235215
case BUFFER_UNDERFLOW: {
236-
log.debug(remoteAddress(), "[%s] Wait for more received data..."::formatted);
237-
if (receivedBuffer.position() > 0) {
238-
receivedBuffer
239-
.compact()
240-
.limit(receivedBuffer.position());
241-
}
216+
log.debug(remoteAddress, "[%s] Wait for more received data..."::formatted);
217+
NetworkUtils.compactNetworkBufferIfNeed(receivedBuffer);
242218
return SKIP_READ_PACKETS;
243219
}
244220
case CLOSED: {
245221
connection.close();
246222
return SKIP_READ_PACKETS;
247223
}
248224
default: {
249-
throw new IllegalStateException("Invalid SSL status: " + result.getStatus());
225+
throw new IllegalStateException("Invalid SSL status:" + result.getStatus());
250226
}
251227
}
252228
}
253229

254-
log.debug(remoteAddress(), "[%s] Clear SSL network buffer"::formatted);
255-
cleanNetworkBuffer(receivedBuffer);
230+
log.debug(remoteAddress, "[%s] Clear SSL network buffer"::formatted);
231+
NetworkUtils.cleanNetworkBuffer(receivedBuffer);
256232
return total;
257233
}
258234

235+
protected ByteBuffer moveDataToNetworkBuffer(ByteBuffer readingBuffer) {
236+
String remoteAddress = remoteAddress();
237+
logAcceptedNewDataPart(remoteAddress, readingBuffer);
238+
239+
ByteBuffer sslNetworkBuffer = sslNetworkBuffer();
240+
int availableSpace = sslNetworkBuffer.capacity() - sslNetworkBuffer.limit();
241+
if (availableSpace >= readingBuffer.limit()) {
242+
BufferUtils.appendAndClear(sslNetworkBuffer, readingBuffer);
243+
} else {
244+
sslNetworkBuffer = increaseNetworkBuffer(readingBuffer.limit());
245+
BufferUtils.appendAndClear(sslNetworkBuffer, readingBuffer);
246+
}
247+
248+
logPendingNetworkData(remoteAddress, sslNetworkBuffer);
249+
return sslNetworkBuffer;
250+
}
251+
259252
protected synchronized ByteBuffer increaseNetworkBuffer(int extra) {
260253
ByteBuffer current = sslNetworkBuffer();
261254
int newSize = (int) Math.max(current.capacity() * 1.3, current.capacity() + extra);
@@ -287,7 +280,28 @@ public void close() {
287280
super.close();
288281
}
289282

290-
protected static void cleanNetworkBuffer(ByteBuffer networkBuffer) {
291-
networkBuffer.clear().limit(0);
283+
private static void logDataBeforeUnwrap(String remoteAddress, ByteBuffer networkBuffer) {
284+
log.debug(remoteAddress, networkBuffer,
285+
(address, buff) -> "[%s] Try to unwrap data:\n%s".formatted(address, hexDump(buff)));
286+
}
287+
288+
private static void logDataBeforeDecrypt(String remoteAddress, ByteBuffer receivedBuffer) {
289+
log.debug(remoteAddress, receivedBuffer,
290+
(address, buf) -> "[%s] Try to decrypt data:\n%s".formatted(address, hexDump(buf)));
291+
}
292+
293+
private static void logDataAfterDecrypt(String remoteAddress, ByteBuffer sslDataBuffer) {
294+
log.debug(remoteAddress, sslDataBuffer,
295+
(address, buf) -> "[%s] Decrypted data:\n%s".formatted(address, hexDump(buf)));
296+
}
297+
298+
private static void logAcceptedNewDataPart(String remoteAddress, ByteBuffer readingBuffer) {
299+
log.debug(remoteAddress, readingBuffer,
300+
(address, buf) -> "[%s] Append new part of received data:\n%s".formatted(address, hexDump(buf)));
301+
}
302+
303+
private static void logPendingNetworkData(String remoteAddress, ByteBuffer sslNetworkBuffer) {
304+
log.debug(remoteAddress, sslNetworkBuffer,
305+
(address, buf) -> "[%s] Result pending received network data:\n%s".formatted(address, hexDump(buf)));
292306
}
293307
}

rlib-network/src/main/java/javasabr/rlib/network/packet/impl/AbstractSslNetworkPacketWriter.java

Lines changed: 40 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -95,36 +95,45 @@ protected boolean tryToSendNextPacketImpl() {
9595

9696
@Override
9797
protected ByteBuffer serialize(WritableNetworkPacket packet) {
98-
if (packet instanceof SslWritableNetworkPacket) {
98+
if (packet instanceof SslWrapRequestPacket) {
9999
return EMPTY_BUFFER;
100100
}
101101

102+
String remoteAddress = remoteAddress();
102103
ByteBuffer serialized = super.serialize(packet);
103-
log.debug(remoteAddress(), serialized, (address, buff) ->
104-
"[%s] Try to encrypt data:\n%s".formatted(address, hexDump(buff)));
104+
logDataBeforeEncrypt(remoteAddress, serialized);
105105

106106
ByteBuffer bufferToSend = sslNetworkBuffer();
107107
SSLEngineResult result = tryEncrypt(serialized, bufferToSend);
108108

109109
if (result.getStatus() == SSLEngineResult.Status.OK && serialized.hasRemaining()) {
110+
log.debug(remoteAddress, serialized.remaining(),
111+
"[%s] Has remaining [%s] bytes after encrypting, will create temp big buffer"::formatted);
112+
110113
int tempBufferSize = (int) ((bufferToSend.limit() + serialized.remaining()) * 1.2);
111114
ByteBuffer tempBuffer = bufferAllocator.takeBuffer(tempBufferSize);
112115
tempBuffer.put(bufferToSend.flip());
116+
113117
while (serialized.hasRemaining()) {
114118
result = tryEncrypt(serialized, bufferToSend);
115119
if (result.getStatus() != SSLEngineResult.Status.OK) {
116120
break;
117121
}
118122
tempBuffer.put(bufferToSend.flip());
119123
}
124+
120125
bufferToSend = tempBuffer;
121126
this.sslTempBuffer = tempBuffer;
122127
}
123128

124129
return switch (result.getStatus()) {
125-
case BUFFER_UNDERFLOW -> increaseAndTryAgain(packet);
126-
case BUFFER_OVERFLOW -> throw new IllegalStateException("Unexpected ssl engine result");
127-
case OK -> bufferToSend.flip();
130+
case BUFFER_UNDERFLOW, BUFFER_OVERFLOW ->
131+
throw new IllegalStateException("Unexpected ssl engine result");
132+
case OK -> {
133+
bufferToSend.flip();
134+
logEncryptedData(remoteAddress, bufferToSend);
135+
yield bufferToSend;
136+
}
128137
case CLOSED -> closeAndReturn();
129138
};
130139
}
@@ -134,12 +143,6 @@ protected ByteBuffer closeAndReturn() {
134143
return EMPTY_BUFFER;
135144
}
136145

137-
protected ByteBuffer increaseAndTryAgain(WritableNetworkPacket packet) {
138-
log.debug(remoteAddress(), "[%s] Increase network buffer and try again..."::formatted);
139-
increaseNetworkBuffer();
140-
return serialize(packet);
141-
}
142-
143146
protected SSLEngineResult tryEncrypt(ByteBuffer source, ByteBuffer destination) {
144147
try {
145148
return sslEngine.wrap(source, destination.clear());
@@ -151,14 +154,15 @@ protected SSLEngineResult tryEncrypt(ByteBuffer source, ByteBuffer destination)
151154

152155
@Nullable
153156
protected ByteBuffer doHandshake(HandshakeStatus handshakeStatus) {
157+
String remoteAddress = remoteAddress();
154158
while (SslUtils.needToProcess(handshakeStatus)) {
155-
log.debug(remoteAddress(), handshakeStatus, "[%s] Do handshake with status:[%s] "::formatted);
159+
log.debug(remoteAddress, handshakeStatus, "[%s] Do handshake with status:[%s] "::formatted);
160+
ByteBuffer sslNetworkBuffer = sslNetworkBuffer();
156161
switch (handshakeStatus) {
157162
case NEED_WRAP: {
158163
SSLEngineResult result;
159164
try {
160165
result = sslEngine.wrap(EMPTY_BUFFERS, sslNetworkBuffer.clear());
161-
handshakeStatus = result.getHandshakeStatus();
162166
} catch (SSLException sslException) {
163167
log.error("A problem was encountered while processing the data that caused the SSLEngine "
164168
+ "to abort. Will try to properly close connection...");
@@ -168,16 +172,11 @@ protected ByteBuffer doHandshake(HandshakeStatus handshakeStatus) {
168172
}
169173
switch (result.getStatus()) {
170174
case OK:
171-
log.debug(remoteAddress(), sslNetworkBuffer, result,
172-
(address, buf, res) -> "[%s] Send wrapped data:\n%s".formatted(address, hexDump(buf, res)));
173-
return sslNetworkBuffer.flip();
174-
case BUFFER_OVERFLOW: {
175-
log.debug(remoteAddress(), "[%s] Increase network buffer"::formatted);
176-
increaseNetworkBuffer();
177-
break;
178-
}
179-
case BUFFER_UNDERFLOW: {
180-
throw new IllegalStateException("Unexpected ssl engine result");
175+
sslNetworkBuffer.flip();
176+
logWrappedData(remoteAddress, sslNetworkBuffer, result);
177+
return sslNetworkBuffer;
178+
case BUFFER_OVERFLOW, BUFFER_UNDERFLOW: {
179+
throw new RuntimeException("Unexpected SSL result:" + result);
181180
}
182181
case CLOSED: {
183182
try {
@@ -189,7 +188,7 @@ protected ByteBuffer doHandshake(HandshakeStatus handshakeStatus) {
189188
break;
190189
}
191190
default: {
192-
throw new IllegalStateException("Invalid SSL status: " + result.getStatus());
191+
throw new IllegalStateException("Invalid SSL result:" + result);
193192
}
194193
}
195194
break;
@@ -202,7 +201,7 @@ protected ByteBuffer doHandshake(HandshakeStatus handshakeStatus) {
202201
break;
203202
}
204203
default: {
205-
throw new IllegalStateException("Invalid SSL status: " + handshakeStatus);
204+
throw new IllegalStateException("Invalid SSL status:" + handshakeStatus);
206205
}
207206
}
208207
}
@@ -220,15 +219,25 @@ protected void clearTempBuffers() {
220219
}
221220
}
222221

223-
protected synchronized void increaseNetworkBuffer() {
224-
sslNetworkBuffer = NetworkUtils
225-
.increasePacketBuffer(sslNetworkBuffer, bufferAllocator, sslEngine);
226-
}
227-
228222
@Override
229223
public void close() {
230224
sslEngine.closeOutbound();
231225
bufferAllocator.putBuffer(sslNetworkBuffer);
232226
super.close();
233227
}
228+
229+
private static void logDataBeforeEncrypt(String remoteAddress, ByteBuffer serialized) {
230+
log.debug(remoteAddress, serialized,
231+
(address, buff) -> "[%s] Try to encrypt data:\n%s".formatted(address, hexDump(buff)));
232+
}
233+
234+
private static void logEncryptedData(String remoteAddress, ByteBuffer bufferToSend) {
235+
log.debug(remoteAddress, bufferToSend,
236+
(address, buff) -> "[%s] Result encrypted data:\n%s".formatted(address, hexDump(buff)));
237+
}
238+
239+
private static void logWrappedData(String remoteAddress, ByteBuffer sslNetworkBuffer, SSLEngineResult result) {
240+
log.debug(remoteAddress, sslNetworkBuffer, result,
241+
(address, buf, res) -> "[%s] Send wrapped data:\n%s".formatted(address, hexDump(buf, res)));
242+
}
234243
}

0 commit comments

Comments
 (0)