@@ -62,7 +62,8 @@ public WritablePacketWithFeedback(CompletableFuture<Boolean> attachment, Writabl
6262 final StampedLock lock ;
6363 final AtomicBoolean closed ;
6464
65- final MutableArray <BiConsumer <C , ? super ReadableNetworkPacket <C >>> subscribers ;
65+ final MutableArray <BiConsumer <C , ? super ReadableNetworkPacket <C >>> validPacketSubscribers ;
66+ final MutableArray <BiConsumer <C , ? super ReadableNetworkPacket <C >>> invalidPacketSubscribers ;
6667
6768 final int maxPacketsByRead ;
6869
@@ -81,7 +82,8 @@ public AbstractConnection(
8182 this .pendingPackets = DequeFactory .arrayBasedBased (WritableNetworkPacket .class );
8283 this .network = network ;
8384 this .closed = new AtomicBoolean (false );
84- this .subscribers = ArrayFactory .copyOnModifyArray (BiConsumer .class );
85+ this .validPacketSubscribers = ArrayFactory .copyOnModifyArray (BiConsumer .class );
86+ this .invalidPacketSubscribers = ArrayFactory .copyOnModifyArray (BiConsumer .class );
8587 this .remoteAddress = String .valueOf (NetworkUtils .getRemoteAddress (channel ));
8688 }
8789
@@ -93,8 +95,14 @@ public void onConnected() {}
9395 protected abstract NetworkPacketWriter packetWriter ();
9496
9597 @ Override
96- public void onReceive (BiConsumer <C , ? super ReadableNetworkPacket <C >> consumer ) {
97- subscribers .add (consumer );
98+ public void onReceiveValidPacket (BiConsumer <C , ? super ReadableNetworkPacket <C >> consumer ) {
99+ validPacketSubscribers .add (consumer );
100+ packetReader ().startRead ();
101+ }
102+
103+ @ Override
104+ public void onReceiveInvalidPacket (BiConsumer <C , ? super ReadableNetworkPacket <C >> consumer ) {
105+ invalidPacketSubscribers .add (consumer );
98106 packetReader ().startRead ();
99107 }
100108
@@ -104,26 +112,43 @@ public void onReceive(BiConsumer<C, ? super ReadableNetworkPacket<C>> consumer)
104112 }
105113
106114 @ Override
107- public Flux <? extends ReadableNetworkPacket <C >> receivedPackets () {
108- return Flux .create (this ::registerFluxOnReceivedPackets );
115+ public Flux <? extends ReadableNetworkPacket <C >> receivedValidPackets () {
116+ return Flux .create (this ::registerFluxOnReceivedValidPackets );
117+ }
118+
119+ @ Override
120+ public Flux <? extends ReadableNetworkPacket <C >> receivedInvalidPackets () {
121+ return Flux .create (this ::registerFluxOnReceivedInvalidPackets );
109122 }
110123
111124 protected void registerFluxOnReceivedEvents (
112125 FluxSink <ReceivedPacketEvent <C , ? extends ReadableNetworkPacket <C >>> sink ) {
113126
114- BiConsumer <C , ReadableNetworkPacket <C >> listener =
127+ BiConsumer <C , ReadableNetworkPacket <C >> validListener =
115128 (connection , packet ) -> sink .next (new ReceivedPacketEvent <>(connection ,
116- packet ));
129+ packet , true ));
130+ BiConsumer <C , ReadableNetworkPacket <C >> invalidListener =
131+ (connection , packet ) -> sink .next (new ReceivedPacketEvent <>(connection ,
132+ packet , true ));
133+
117134
118- onReceive (listener );
135+ onReceiveValidPacket (validListener );
136+ onReceiveInvalidPacket (invalidListener );
119137
120- sink .onDispose (() -> subscribers .remove (listener ));
138+ sink .onDispose (() -> validPacketSubscribers .remove (validListener ));
139+ sink .onDispose (() -> invalidPacketSubscribers .remove (invalidListener ));
121140 }
122141
123- protected void registerFluxOnReceivedPackets (FluxSink <? super ReadableNetworkPacket <C >> sink ) {
142+ protected void registerFluxOnReceivedValidPackets (FluxSink <? super ReadableNetworkPacket <C >> sink ) {
124143 BiConsumer <C , ReadableNetworkPacket <C >> listener = (connection , packet ) -> sink .next (packet );
125- onReceive (listener );
126- sink .onDispose (() -> subscribers .remove (listener ));
144+ onReceiveValidPacket (listener );
145+ sink .onDispose (() -> validPacketSubscribers .remove (listener ));
146+ }
147+
148+ protected void registerFluxOnReceivedInvalidPackets (FluxSink <? super ReadableNetworkPacket <C >> sink ) {
149+ BiConsumer <C , ReadableNetworkPacket <C >> listener = (connection , packet ) -> sink .next (packet );
150+ onReceiveInvalidPacket (listener );
151+ sink .onDispose (() -> invalidPacketSubscribers .remove (listener ));
127152 }
128153
129154 @ Nullable
@@ -169,9 +194,16 @@ public boolean closed() {
169194
170195 protected void serializedPacket (WritableNetworkPacket <?> packet ) {}
171196
172- protected void handleReceivedPacket (ReadableNetworkPacket <C > packet ) {
173- log .debug (packet , remoteAddress , "Handle received packet:[%s] from:[%s]" ::formatted );
174- subscribers
197+ protected void handleReceivedValidPacket (ReadableNetworkPacket <C > packet ) {
198+ log .debug (packet , remoteAddress , "Handle received valid packet:[%s] from:[%s]" ::formatted );
199+ validPacketSubscribers
200+ .iterations ()
201+ .forEach ((C ) this , packet , BiConsumer ::accept );
202+ }
203+
204+ protected void handleReceivedInvalidPacket (ReadableNetworkPacket <C > packet ) {
205+ log .debug (packet , remoteAddress , "Handle failed received packet:[%s] from:[%s]" ::formatted );
206+ invalidPacketSubscribers
175207 .iterations ()
176208 .forEach ((C ) this , packet , BiConsumer ::accept );
177209 }
0 commit comments