Skip to content

Commit b5bc8f9

Browse files
committed
continue fix ssl network implementation
1 parent 6015fa8 commit b5bc8f9

4 files changed

Lines changed: 66 additions & 32 deletions

File tree

rlib-network/src/loadTest/java/javasabr/rlib/network/StringSslNetworkLoadTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ void connectAndSendMessages(
8282
int delay = random.nextInt(MAX_SEND_DELAY);
8383
ScheduledFuture<?> schedule = executor.schedule(
8484
() -> {
85-
StringWritableNetworkPacket message = newMessage(12000, 15000); // 10240
85+
StringWritableNetworkPacket message = newMessage(10, 100); // 10240
8686
connection.send(message);
8787
}, delay, TimeUnit.MILLISECONDS);
8888
tasks.add(schedule);

rlib-network/src/main/java/javasabr/rlib/network/packet/WritableNetworkPacket.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import java.nio.ByteBuffer;
44

5+
56
/**
67
* Interface to implement a writable packet.
78
*

rlib-network/src/main/java/javasabr/rlib/network/packet/impl/AbstractNetworkPacketWriter.java

Lines changed: 46 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -34,15 +34,15 @@
3434
public abstract class AbstractNetworkPacketWriter<W extends WritableNetworkPacket, C extends Connection<?, W>>
3535
implements NetworkPacketWriter {
3636

37-
final CompletionHandler<Integer, WritableNetworkPacket> writeHandler = new CompletionHandler<>() {
37+
final CompletionHandler<Integer, @Nullable WritableNetworkPacket> writeHandler = new CompletionHandler<>() {
3838

3939
@Override
40-
public void completed(Integer result, WritableNetworkPacket packet) {
40+
public void completed(Integer result, @Nullable WritableNetworkPacket packet) {
4141
handleSuccessfulWritingData(result, packet);
4242
}
4343

4444
@Override
45-
public void failed(Throwable exc, WritableNetworkPacket packet) {
45+
public void failed(Throwable exc, @Nullable WritableNetworkPacket packet) {
4646
handleFailedWritingData(exc, packet);
4747
}
4848
};
@@ -64,7 +64,7 @@ public void failed(Throwable exc, WritableNetworkPacket packet) {
6464
volatile ByteBuffer writingBuffer = EMPTY_BUFFER;
6565

6666
final Runnable updateActivityFunction;
67-
final Supplier<@Nullable WritableNetworkPacket> nextWritePacketSupplier;
67+
final Supplier<@Nullable WritableNetworkPacket> writablePacketProvider;
6868
final ObjBoolConsumer<WritableNetworkPacket> sentPacketHandler;
6969
final Consumer<WritableNetworkPacket> serializedToChannelPacketHandler;
7070

@@ -82,7 +82,7 @@ public AbstractNetworkPacketWriter(
8282
this.firstWriteBuffer = bufferAllocator.takeWriteBuffer();
8383
this.secondWriteBuffer = bufferAllocator.takeWriteBuffer();
8484
this.updateActivityFunction = updateActivityFunction;
85-
this.nextWritePacketSupplier = packetProvider;
85+
this.writablePacketProvider = packetProvider;
8686
this.serializedToChannelPacketHandler = serializedToChannelPacketHandler;
8787
this.sentPacketHandler = sentPacketHandler;
8888
}
@@ -97,29 +97,44 @@ public boolean tryToSendNextPacket() {
9797
return false;
9898
}
9999

100-
WritableNetworkPacket nextPacket = nextWritePacketSupplier.get();
101-
if (nextPacket == null) {
102-
writing.set(false);
103-
return false;
100+
boolean startedWriting = false;
101+
102+
try {
103+
startedWriting = tryToSendNextPacketImpl();
104+
} catch (Exception ex) {
105+
log.error(ex);
104106
}
105107

106-
boolean startedWriting = false;
107-
ByteBuffer resultBuffer = serialize(nextPacket);
108-
109-
if (resultBuffer.limit() != 0) {
110-
writingBuffer = resultBuffer;
111-
log.debug(remoteAddress(), resultBuffer,
112-
(address, buff) -> "[%s] Write to channel data:\n" + hexDump(buff));
113-
socketChannel.write(resultBuffer, nextPacket, writeHandler);
114-
startedWriting = true;
115-
} else {
108+
if (!startedWriting) {
116109
writing.set(false);
117110
}
118111

119-
serializedToChannelPacketHandler.accept(nextPacket);
120112
return startedWriting;
121113
}
122114

115+
protected boolean tryToSendNextPacketImpl() {
116+
for (var nextPacket = writablePacketProvider.get();
117+
nextPacket != null; nextPacket = writablePacketProvider.get()) {
118+
ByteBuffer resultBuffer = serialize(nextPacket);
119+
boolean startedWriting = writeBuffer(resultBuffer, nextPacket);
120+
serializedToChannelPacketHandler.accept(nextPacket);
121+
if (startedWriting) {
122+
return true;
123+
}
124+
}
125+
return false;
126+
}
127+
128+
protected boolean writeBuffer(ByteBuffer resultBuffer, @Nullable WritableNetworkPacket nextPacket) {
129+
if (resultBuffer.limit() == 0) {
130+
return false;
131+
}
132+
writingBuffer = resultBuffer;
133+
log.debug(remoteAddress(), resultBuffer, (address, buff) -> "[%s] Write to channel data:\n" + hexDump(buff));
134+
socketChannel.write(resultBuffer, nextPacket, writeHandler);
135+
return true;
136+
}
137+
123138
/**
124139
* Serializes the network packet to buffer.
125140
*
@@ -324,11 +339,13 @@ protected ByteBuffer writeHeader(ByteBuffer buffer, int value, int headerSize) {
324339
* @param wroteBytes the count of wrote bytes.
325340
* @param packet the sent packet.
326341
*/
327-
protected void handleSuccessfulWritingData(Integer wroteBytes, WritableNetworkPacket packet) {
342+
protected void handleSuccessfulWritingData(Integer wroteBytes, @Nullable WritableNetworkPacket packet) {
328343
updateActivityFunction.run();
329344

330345
if (wroteBytes == -1) {
331-
sentPacketHandler.accept(packet, false);
346+
if (packet != null) {
347+
sentPacketHandler.accept(packet, false);
348+
}
332349
connection.close();
333350
return;
334351
}
@@ -343,13 +360,12 @@ protected void handleSuccessfulWritingData(Integer wroteBytes, WritableNetworkPa
343360
log.debug(remoteAddress(), wroteBytes, "[%s] Finished writing [%s] bytes"::formatted);
344361
}
345362

346-
sentPacketHandler.accept(packet, true);
363+
if (packet != null) {
364+
sentPacketHandler.accept(packet, true);
365+
}
347366

348367
if (writing.compareAndSet(true, false)) {
349-
// if we have temp buffers, we can remove it after finishing writing a packet
350-
if (firstWriteTempBuffer != null || secondWriteTempBuffer != null) {
351-
clearTempBuffers();
352-
}
368+
clearTempBuffers();
353369
tryToSendNextPacket();
354370
}
355371
}
@@ -360,14 +376,15 @@ protected void handleSuccessfulWritingData(Integer wroteBytes, WritableNetworkPa
360376
* @param exception the exception.
361377
* @param packet the packet.
362378
*/
363-
protected void handleFailedWritingData(Throwable exception, WritableNetworkPacket packet) {
379+
protected void handleFailedWritingData(Throwable exception, @Nullable WritableNetworkPacket packet) {
364380
log.error(new RuntimeException("Failed writing packet:" + packet, exception));
365381
if (exception instanceof IOException) {
366382
connection.close();
367383
return;
368384
}
369385
if (!connection.closed()) {
370386
if (writing.compareAndSet(true, false)) {
387+
clearTempBuffers();
371388
tryToSendNextPacket();
372389
}
373390
}
@@ -383,6 +400,7 @@ public void close() {
383400
}
384401

385402
protected void clearTempBuffers() {
403+
this.writingBuffer = EMPTY_BUFFER;
386404

387405
var firstWriteTempBuffer = this.firstWriteTempBuffer;
388406
if (firstWriteTempBuffer != null) {

rlib-network/src/main/java/javasabr/rlib/network/packet/impl/AbstractSslNetworkPacketWriter.java

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,21 @@ public boolean tryToSendNextPacket() {
7878
return super.tryToSendNextPacket();
7979
}
8080

81+
@Override
82+
protected boolean tryToSendNextPacketImpl() {
83+
HandshakeStatus status = sslEngine.getHandshakeStatus();
84+
if (SslUtils.isReadyToCrypt(status)) {
85+
return super.tryToSendNextPacketImpl();
86+
}
87+
88+
ByteBuffer dataToSend = doHandshake(SslWritableNetworkPacket.getInstance());
89+
if (dataToSend != null) {
90+
return writeBuffer(dataToSend, null);
91+
}
92+
93+
throw new IllegalStateException("Unexpected SSL state");
94+
}
95+
8196
@Override
8297
protected ByteBuffer serialize(WritableNetworkPacket packet) {
8398
HandshakeStatus status = sslEngine.getHandshakeStatus();
@@ -158,7 +173,6 @@ protected ByteBuffer doHandshake(WritableNetworkPacket packet) {
158173
switch (handshakeStatus) {
159174
case NEED_WRAP: {
160175
try {
161-
// check result
162176
result = sslEngine.wrap(EMPTY_BUFFERS, sslNetworkBuffer.clear());
163177
handshakeStatus = result.getHandshakeStatus();
164178
} catch (SSLException sslException) {
@@ -168,13 +182,14 @@ protected ByteBuffer doHandshake(WritableNetworkPacket packet) {
168182
handshakeStatus = sslEngine.getHandshakeStatus();
169183
break;
170184
}
185+
171186
switch (result.getStatus()) {
172187
case OK:
173188
sslNetworkBuffer.flip();
174-
if (handshakeStatus == HandshakeStatus.NEED_WRAP) {
189+
/* if (handshakeStatus == HandshakeStatus.NEED_WRAP) {
175190
log.debug("Send command to wrap data again");
176191
queueAtFirst.accept(SslWritableNetworkPacket.getInstance());
177-
}
192+
}*/
178193
log.debug(sslNetworkBuffer, result, (buf, res) -> "Send wrapped data:\n" + hexDump(buf, res));
179194
return sslNetworkBuffer;
180195
case BUFFER_OVERFLOW: {

0 commit comments

Comments
 (0)