Skip to content

Commit 34de84b

Browse files
committed
IGNITE-28473 Concurrent message serialization in NIO workers causes performance degradation
1 parent 74af609 commit 34de84b

2 files changed

Lines changed: 179 additions & 3 deletions

File tree

modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java

Lines changed: 164 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import java.util.Iterator;
4343
import java.util.List;
4444
import java.util.Map;
45+
import java.util.Optional;
4546
import java.util.Map.Entry;
4647
import java.util.Set;
4748
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -93,6 +94,8 @@
9394
import org.apache.ignite.thread.IgniteThread;
9495
import org.jetbrains.annotations.Nullable;
9596

97+
import static java.lang.System.arraycopy;
98+
import static java.util.Optional.empty;
9699
import static org.apache.ignite.failure.FailureType.CRITICAL_ERROR;
97100
import static org.apache.ignite.failure.FailureType.SYSTEM_WORKER_TERMINATION;
98101
import static org.apache.ignite.internal.processors.tracing.SpanTags.SOCKET_WRITE_BYTES;
@@ -647,23 +650,90 @@ IgniteInternalFuture<?> send(GridNioSession ses,
647650

648651
GridSelectorNioSessionImpl impl = (GridSelectorNioSessionImpl)ses;
649652

653+
byte[] serialized = serializeMessage(ses, msg).orElse(null);
654+
650655
if (createFut) {
651656
NioOperationFuture<?> fut = new NioOperationFuture<Void>(impl, NioOperation.REQUIRE_WRITE, msg,
652657
skipRecoveryPred.apply(msg), ackC);
653658

659+
fut.serializedMessage(serialized);
660+
654661
send0(impl, fut, false);
655662

656663
return fut;
657664
}
658665
else {
659-
SessionWriteRequest req = new WriteRequestImpl(ses, msg, skipRecoveryPred.apply(msg), ackC);
666+
var req = new WriteRequestImpl(ses, msg, skipRecoveryPred.apply(msg), ackC, serialized);
660667

661668
send0(impl, req, false);
662669

663670
return null;
664671
}
665672
}
666673

674+
/**
675+
* Serializes message to byte array eagerly in the calling thread.
676+
*
677+
* @param ses Session.
678+
* @param msg Message to serialize.
679+
* @return Serialized message bytes, or empty if eager serialization is not available.
680+
*/
681+
private Optional<byte[]> serializeMessage(GridNioSession ses, Message msg) {
682+
if (writerFactory == null || msgFactory == null)
683+
return empty();
684+
685+
MessageWriter writer;
686+
MessageSerializer<Message> msgSer;
687+
688+
try {
689+
writer = writerFactory.writer(ses);
690+
msgSer = msgFactory.serializer(msg.directType());
691+
}
692+
catch (Exception e) {
693+
if (log.isDebugEnabled())
694+
log.debug("Failed to prepare eager serialization, will use lazy path [msg=" + msg + ", err=" + e + ']');
695+
696+
return empty();
697+
}
698+
699+
try {
700+
int capacity = 4096;
701+
var buf = ByteBuffer.allocate(capacity);
702+
703+
while (true) {
704+
writer.setBuffer(buf);
705+
706+
if (msgSer.writeTo(msg, writer)) {
707+
writer.reset();
708+
709+
int len = buf.position();
710+
var result = new byte[len];
711+
712+
arraycopy(buf.array(), 0, result, 0, len);
713+
714+
return Optional.of(result);
715+
}
716+
717+
int pos = buf.position();
718+
719+
capacity *= 2;
720+
721+
var newBuf = ByteBuffer.allocate(capacity);
722+
723+
arraycopy(buf.array(), 0, newBuf.array(), 0, pos);
724+
725+
newBuf.position(pos);
726+
727+
buf = newBuf;
728+
}
729+
}
730+
catch (Exception e) {
731+
log.warning("Failed to eagerly serialize message, will use lazy path [msg=" + msg + ']', e);
732+
733+
return empty();
734+
}
735+
}
736+
667737
/**
668738
* @param ses Session.
669739
* @param req Request.
@@ -1614,7 +1684,21 @@ private boolean writeToBuffer(
16141684

16151685
int startPos = buf.position();
16161686

1617-
if (messageFactory() == null) {
1687+
byte[] serialized = req.serializedMessage();
1688+
1689+
if (serialized != null) {
1690+
int off = req.serializedOffset();
1691+
int len = Math.min(serialized.length - off, buf.remaining());
1692+
1693+
buf.put(serialized, off, len);
1694+
off += len;
1695+
1696+
finished = off >= serialized.length;
1697+
1698+
if (!finished)
1699+
req.serializedOffset(off);
1700+
}
1701+
else if (messageFactory() == null) {
16181702
assert msg instanceof ClientMessage; // TODO: Will refactor in IGNITE-26554.
16191703

16201704
finished = ((ClientMessage)msg).writeTo(buf);
@@ -1815,7 +1899,21 @@ private boolean writeToBuffer(GridSelectorNioSessionImpl ses, ByteBuffer buf, Se
18151899

18161900
int startPos = buf.position();
18171901

1818-
if (msgFactory == null) {
1902+
byte[] serialized = req.serializedMessage();
1903+
1904+
if (serialized != null) {
1905+
int off = req.serializedOffset();
1906+
int len = Math.min(serialized.length - off, buf.remaining());
1907+
1908+
buf.put(serialized, off, len);
1909+
off += len;
1910+
1911+
finished = off >= serialized.length;
1912+
1913+
if (!finished)
1914+
req.serializedOffset(off);
1915+
}
1916+
else if (msgFactory == null) {
18191917
assert msg instanceof ClientMessage; // TODO: Will refactor in IGNITE-26554.
18201918

18211919
finished = ((ClientMessage)msg).writeTo(buf);
@@ -3378,6 +3476,12 @@ static final class WriteRequestImpl implements SessionWriteRequest, SessionChang
33783476
/** Span for tracing. */
33793477
private Span span;
33803478

3479+
/** Pre-serialized message bytes. */
3480+
private final byte[] serializedMsg;
3481+
3482+
/** Current offset in pre-serialized bytes for partial writes. */
3483+
private int serializedOff;
3484+
33813485
/**
33823486
* @param ses Session.
33833487
* @param msg Message.
@@ -3388,11 +3492,27 @@ static final class WriteRequestImpl implements SessionWriteRequest, SessionChang
33883492
Object msg,
33893493
boolean skipRecovery,
33903494
IgniteInClosure<IgniteException> ackC) {
3495+
this(ses, msg, skipRecovery, ackC, null);
3496+
}
3497+
3498+
/**
3499+
* @param ses Session.
3500+
* @param msg Message.
3501+
* @param skipRecovery Skip recovery flag.
3502+
* @param ackC Closure invoked when message ACK is received.
3503+
* @param serializedMsg Pre-serialized message bytes.
3504+
*/
3505+
WriteRequestImpl(GridNioSession ses,
3506+
Object msg,
3507+
boolean skipRecovery,
3508+
IgniteInClosure<IgniteException> ackC,
3509+
byte[] serializedMsg) {
33913510
this.ses = ses;
33923511
this.msg = msg;
33933512
this.skipRecovery = skipRecovery;
33943513
this.ackC = ackC;
33953514
this.span = MTC.span();
3515+
this.serializedMsg = serializedMsg;
33963516
}
33973517

33983518
/** {@inheritDoc} */
@@ -3450,6 +3570,21 @@ static final class WriteRequestImpl implements SessionWriteRequest, SessionChang
34503570
return span;
34513571
}
34523572

3573+
/** {@inheritDoc} */
3574+
@Override public byte[] serializedMessage() {
3575+
return serializedMsg;
3576+
}
3577+
3578+
/** {@inheritDoc} */
3579+
@Override public int serializedOffset() {
3580+
return serializedOff;
3581+
}
3582+
3583+
/** {@inheritDoc} */
3584+
@Override public void serializedOffset(int off) {
3585+
serializedOff = off;
3586+
}
3587+
34533588
/** {@inheritDoc} */
34543589
@Override public String toString() {
34553590
return S.toString(WriteRequestImpl.class, this);
@@ -3496,6 +3631,12 @@ private static class NioOperationFuture<R> extends GridFutureAdapter<R> implemen
34963631
/** */
34973632
private IgniteInClosure<IgniteException> ackC;
34983633

3634+
/** Pre-serialized message bytes. */
3635+
private byte[] serializedMsg;
3636+
3637+
/** Current offset in pre-serialized bytes for partial writes. */
3638+
private int serializedOff;
3639+
34993640
/**
35003641
* @param sockCh Socket channel.
35013642
* @param accepted {@code True} if socket has been accepted.
@@ -3664,6 +3805,26 @@ boolean accepted() {
36643805
return skipRecovery;
36653806
}
36663807

3808+
/** {@inheritDoc} */
3809+
@Override public byte[] serializedMessage() {
3810+
return serializedMsg;
3811+
}
3812+
3813+
/** {@inheritDoc} */
3814+
@Override public int serializedOffset() {
3815+
return serializedOff;
3816+
}
3817+
3818+
/** {@inheritDoc} */
3819+
@Override public void serializedOffset(int off) {
3820+
serializedOff = off;
3821+
}
3822+
3823+
/** Sets pre-serialized message bytes. */
3824+
void serializedMessage(byte[] serializedMsg) {
3825+
this.serializedMsg = serializedMsg;
3826+
}
3827+
36673828
/** {@inheritDoc} */
36683829
@Override public String toString() {
36693830
return S.toString(NioOperationFuture.class, this);

modules/core/src/main/java/org/apache/ignite/internal/util/nio/SessionWriteRequest.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,4 +78,19 @@ public interface SessionWriteRequest {
7878
* @return Span.
7979
*/
8080
Span span();
81+
82+
/** @return Pre-serialized message bytes, or {@code null} if not pre-serialized. */
83+
default byte[] serializedMessage() {
84+
return null;
85+
}
86+
87+
/** @return Offset in serialized bytes from which to continue writing. */
88+
default int serializedOffset() {
89+
return 0;
90+
}
91+
92+
/** @param off New offset after partial write. */
93+
default void serializedOffset(int off) {
94+
// No-op.
95+
}
8196
}

0 commit comments

Comments
 (0)