Skip to content

Commit 72fd070

Browse files
authored
Fix duplicate MONITOR listener invocation after async packet processing (#3630)
1 parent 5a9afed commit 72fd070

6 files changed

Lines changed: 87 additions & 66 deletions

File tree

src/main/java/com/comphenix/protocol/async/AsyncFilterManager.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import com.comphenix.protocol.events.PacketListener;
2828
import com.comphenix.protocol.injector.collection.InboundPacketListenerSet;
2929
import com.comphenix.protocol.injector.collection.OutboundPacketListenerSet;
30+
import com.comphenix.protocol.injector.netty.Injector;
3031
import com.comphenix.protocol.scheduler.ProtocolScheduler;
3132
import com.google.common.base.Objects;
3233
import com.google.common.collect.ImmutableList;
@@ -274,7 +275,7 @@ void unregisterAsyncHandlerInternal(AsyncListenerHandler handler) {
274275
* @return TRUE if we are, FALSE otherwise.
275276
*/
276277
private boolean onMainThread() {
277-
return Thread.currentThread().getId() == mainThread.getId();
278+
return Thread.currentThread() == mainThread;
278279
}
279280

280281
@Override
@@ -347,22 +348,22 @@ public boolean hasAsynchronousListeners(PacketEvent packet) {
347348
* Construct a asynchronous marker with all the default values.
348349
* @return Asynchronous marker.
349350
*/
350-
public AsyncMarker createAsyncMarker() {
351-
return createAsyncMarker(AsyncMarker.DEFAULT_TIMEOUT_DELTA);
351+
public AsyncMarker createAsyncMarker(Injector injector) {
352+
return createAsyncMarker(injector, AsyncMarker.DEFAULT_TIMEOUT_DELTA);
352353
}
353354

354355
/**
355356
* Construct an async marker with the given sending priority delta and timeout delta.
356357
* @param timeoutDelta - how long (in ms) until the packet expire.
357358
* @return An async marker.
358359
*/
359-
public AsyncMarker createAsyncMarker(long timeoutDelta) {
360-
return createAsyncMarker(timeoutDelta, currentSendingIndex.incrementAndGet());
360+
public AsyncMarker createAsyncMarker(Injector injector, long timeoutDelta) {
361+
return createAsyncMarker(injector, timeoutDelta, currentSendingIndex.incrementAndGet());
361362
}
362363

363364
// Helper method
364-
private AsyncMarker createAsyncMarker(long timeoutDelta, long sendingIndex) {
365-
return new AsyncMarker(manager, sendingIndex, System.currentTimeMillis(), timeoutDelta);
365+
private AsyncMarker createAsyncMarker(Injector injector, long timeoutDelta, long sendingIndex) {
366+
return new AsyncMarker(injector, sendingIndex, System.currentTimeMillis(), timeoutDelta);
366367
}
367368

368369
@Override

src/main/java/com/comphenix/protocol/async/AsyncListenerHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -538,7 +538,7 @@ private boolean waitForStops() throws InterruptedException {
538538
*/
539539
private void listenerLoop(int workerID) {
540540
// Danger, danger!
541-
if (Thread.currentThread().getId() == mainThread.getId())
541+
if (Thread.currentThread() == mainThread)
542542
throw new IllegalStateException("Do not call this method from the main thread.");
543543
if (cancelled)
544544
throw new IllegalStateException("Listener has been cancelled. Create a new listener instead.");

src/main/java/com/comphenix/protocol/async/AsyncMarker.java

Lines changed: 22 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -23,14 +23,18 @@
2323
import java.lang.reflect.Method;
2424
import java.util.Iterator;
2525
import java.util.List;
26+
import java.util.Objects;
27+
import java.util.concurrent.atomic.AtomicBoolean;
2628
import java.util.concurrent.atomic.AtomicInteger;
2729
import java.util.logging.Level;
2830

2931
import com.comphenix.protocol.PacketStream;
3032
import com.comphenix.protocol.PacketType;
33+
import com.comphenix.protocol.ProtocolLibrary;
3134
import com.comphenix.protocol.ProtocolLogger;
3235
import com.comphenix.protocol.events.NetworkMarker;
3336
import com.comphenix.protocol.events.PacketEvent;
37+
import com.comphenix.protocol.injector.netty.Injector;
3438
import com.comphenix.protocol.reflect.FieldAccessException;
3539
import com.comphenix.protocol.reflect.FuzzyReflection;
3640
import com.comphenix.protocol.utility.MinecraftReflection;
@@ -64,7 +68,7 @@ public class AsyncMarker implements Serializable, Comparable<AsyncMarker> {
6468
/**
6569
* The packet stream responsible for transmitting the packet when it's done processing.
6670
*/
67-
private transient PacketStream packetStream;
71+
private final Injector injector;
6872

6973
/**
7074
* Current list of async packet listeners.
@@ -86,7 +90,7 @@ public class AsyncMarker implements Serializable, Comparable<AsyncMarker> {
8690
private volatile boolean processed;
8791

8892
// Whether or not the packet has been sent
89-
private volatile boolean transmitted;
93+
private final AtomicBoolean transmitted = new AtomicBoolean(false);
9094

9195
// Whether or not the asynchronous processing itself should be cancelled
9296
private volatile boolean asyncCancelled;
@@ -109,11 +113,8 @@ public class AsyncMarker implements Serializable, Comparable<AsyncMarker> {
109113
* Create a container for asyncronous packets.
110114
* @param initialTime - the current time in milliseconds since 01.01.1970 00:00.
111115
*/
112-
AsyncMarker(PacketStream packetStream, long sendingIndex, long initialTime, long timeoutDelta) {
113-
if (packetStream == null)
114-
throw new IllegalArgumentException("packetStream cannot be NULL");
115-
116-
this.packetStream = packetStream;
116+
AsyncMarker(Injector injector, long sendingIndex, long initialTime, long timeoutDelta) {
117+
this.injector = Objects.requireNonNull(injector, "injector is nul");
117118

118119
// Timeout
119120
this.initialTime = initialTime;
@@ -180,16 +181,18 @@ public void setNewSendingIndex(long newSendingIndex) {
180181
* Retrieve the packet stream responsible for transmitting this packet.
181182
* @return The packet stream.
182183
*/
184+
@Deprecated
183185
public PacketStream getPacketStream() {
184-
return packetStream;
186+
return ProtocolLibrary.getProtocolManager();
185187
}
186188

187189
/**
188190
* Sets the output packet stream responsible for transmitting this packet.
189191
* @param packetStream - new output packet stream.
190192
*/
193+
@Deprecated
191194
public void setPacketStream(PacketStream packetStream) {
192-
this.packetStream = packetStream;
195+
// NOOP
193196
}
194197

195198
/**
@@ -285,7 +288,7 @@ public void setProcessingLock(Object processingLock) {
285288
* @return TRUE if it has been sent before, FALSE otherwise.
286289
*/
287290
public boolean isTransmitted() {
288-
return transmitted;
291+
return transmitted.get();
289292
}
290293

291294
/**
@@ -383,13 +386,18 @@ void setListenerTraversal(Iterator<AsyncListenerHandler> listenerTraversal) {
383386
* @throws IOException If the packet couldn't be sent.
384387
*/
385388
void sendPacket(PacketEvent event) throws IOException {
389+
if (!this.transmitted.compareAndSet(false, true)) {
390+
return;
391+
}
392+
393+
Object handle = event.getPacket().getHandle();
394+
386395
if (event.isServerPacket()) {
387-
packetStream.sendServerPacket(event.getPlayer(), event.getPacket(), NetworkMarker.getNetworkMarker(event), false);
396+
NetworkMarker marker = NetworkMarker.getNetworkMarker(event);
397+
this.injector.sendClientboundPacket(handle, marker, false);
388398
} else {
389-
packetStream.receiveClientPacket(event.getPlayer(), event.getPacket(), NetworkMarker.getNetworkMarker(event),
390-
false);
399+
this.injector.readServerboundPacket(handle);
391400
}
392-
transmitted = true;
393401
}
394402

395403
/**

src/main/java/com/comphenix/protocol/injector/PacketFilterManager.java

Lines changed: 54 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import com.comphenix.protocol.injector.collection.InboundPacketListenerSet;
4444
import com.comphenix.protocol.injector.collection.OutboundPacketListenerSet;
4545
import com.comphenix.protocol.injector.collection.PacketListenerSet;
46+
import com.comphenix.protocol.injector.netty.Injector;
4647
import com.comphenix.protocol.injector.netty.WirePacket;
4748
import com.comphenix.protocol.injector.netty.manager.NetworkManagerInjector;
4849
import com.comphenix.protocol.injector.packet.PacketRegistry;
@@ -151,29 +152,20 @@ public void sendServerPacket(Player receiver, PacketContainer packet, boolean fi
151152

152153
@Override
153154
public void sendServerPacket(Player receiver, PacketContainer packet, NetworkMarker marker, boolean filters) {
154-
if (!this.closed) {
155-
// if we skip the packet events later when actually writing into the pipeline we at least notify all
156-
// monitor listeners before doing so - they will not be able to change the event tho
157-
if (!filters) {
158-
// ensure we are on the main thread if any listener requires that
159-
if (this.hasMainThreadListener(packet.getType()) && !this.server.isPrimaryThread()) {
160-
NetworkMarker copy = marker; // okay fine
161-
ProtocolLibrary.getScheduler().scheduleSyncDelayedTask(
162-
() -> this.sendServerPacket(receiver, packet, copy, false), 1L);
163-
return;
164-
}
165-
166-
// construct the event and post to all monitor listeners
155+
if (this.closed) {
156+
return;
157+
}
158+
159+
// A monitor listener should never modify a packet/event so we can simply invoke monitor listeners
160+
// independently of our injector pipeline
161+
if (!filters) {
162+
this.runMonitorListeners(packet, () -> {
167163
PacketEvent event = PacketEvent.fromServer(this, packet, marker, receiver, false);
168164
this.outboundListeners.invoke(event, ListenerPriority.MONITOR);
169-
170-
// update the marker of the event without accidentally constructing it
171-
marker = NetworkMarker.getNetworkMarker(event);
172-
}
173-
174-
// process outbound
175-
this.networkManagerInjector.getInjector(receiver).sendClientboundPacket(packet.getHandle(), marker, filters);
165+
});
176166
}
167+
168+
this.networkManagerInjector.getInjector(receiver).sendClientboundPacket(packet.getHandle(), marker, filters);
177169
}
178170

179171
@Override
@@ -200,34 +192,40 @@ public void receiveClientPacket(Player sender, PacketContainer packet, boolean f
200192

201193
@Override
202194
public void receiveClientPacket(Player sender, PacketContainer packet, NetworkMarker marker, boolean filters) {
203-
if (!this.closed) {
204-
// make sure we are on the main thread if any listener of the packet needs it
205-
if (this.hasMainThreadListener(packet.getType()) && !this.server.isPrimaryThread()) {
206-
ProtocolLibrary.getScheduler().runTask(
207-
() -> this.receiveClientPacket(sender, packet, marker, filters));
195+
if (this.closed) {
196+
return;
197+
}
198+
199+
// make sure we are on the main thread if any listener of the packet needs it
200+
if (filters && this.requiresMainThread(packet)) {
201+
ProtocolLibrary.getScheduler().runTask(
202+
() -> this.receiveClientPacket(sender, packet, marker, filters));
203+
return;
204+
}
205+
206+
Object nmsPacket = packet.getHandle();
207+
208+
if (filters) {
209+
PacketEvent event = PacketEvent.fromClient(this, packet, marker, sender, false);
210+
this.invokeInboundPacketListeners(event);
211+
212+
if (event.isCancelled()) {
208213
return;
209214
}
210215

211-
Object nmsPacket = packet.getHandle();
212-
// check to which listeners we need to post the packet
213-
if (filters) {
214-
// post to all listeners
215-
PacketEvent event = PacketEvent.fromClient(this.networkManagerInjector, packet, null, sender);
216-
this.invokeInboundPacketListeners(event);
217-
if (event.isCancelled()) {
218-
return;
219-
}
220-
221-
// prevent possible de-sync
222-
nmsPacket = event.getPacket().getHandle();
223-
} else {
216+
// Prevent possible de-sync if the packet was replaced by a listener.
217+
nmsPacket = event.getPacket().getHandle();
218+
} else {
219+
// A monitor listener should never modify a packet/event so we can simply invoke monitor listeners
220+
// independently of our injector pipeline
221+
this.runMonitorListeners(packet, () -> {
224222
PacketEvent event = PacketEvent.fromClient(this, packet, marker, sender, false);
225223
this.inboundListeners.invoke(event, ListenerPriority.MONITOR);
226-
}
227-
228-
// post to the player inject, reset our cancel state change
229-
this.networkManagerInjector.getInjector(sender).readServerboundPacket(nmsPacket);
224+
});
230225
}
226+
227+
// post to the player inject, reset our cancel state change
228+
this.networkManagerInjector.getInjector(sender).readServerboundPacket(nmsPacket);
231229
}
232230

233231
@Override
@@ -516,12 +514,25 @@ public void invokeOutboundPacketListeners(PacketEvent event) {
516514
this.postPacketToListeners(this.outboundListeners, event, true);
517515
}
518516
}
517+
518+
private boolean requiresMainThread(PacketContainer packet) {
519+
return this.hasMainThreadListener(packet.getType()) && !this.server.isPrimaryThread();
520+
}
521+
522+
private void runMonitorListeners(PacketContainer packet, Runnable notifyMonitor) {
523+
if (this.requiresMainThread(packet)) {
524+
ProtocolLibrary.getScheduler().runTask(notifyMonitor);
525+
} else {
526+
notifyMonitor.run();
527+
}
528+
}
519529

520530
private void postPacketToListeners(PacketListenerSet listeners, PacketEvent event, boolean outbound) {
521531
try {
522532
// append async marker if any async listener for the packet was registered
523533
if (this.asyncFilterManager.hasAsynchronousListeners(event)) {
524-
event.setAsyncMarker(this.asyncFilterManager.createAsyncMarker());
534+
Injector injector = this.networkManagerInjector.getInjector(event.getPlayer());
535+
event.setAsyncMarker(this.asyncFilterManager.createAsyncMarker(injector));
525536
}
526537

527538
// post to sync listeners

src/main/java/com/comphenix/protocol/injector/netty/channel/NettyChannelInjector.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -243,7 +243,7 @@ public void close() {
243243
this.uninject();
244244

245245
// remove any outgoing references
246-
this.channel.attr(INJECTOR).remove();
246+
this.channel.attr(INJECTOR).set(null);
247247

248248
// cleanup
249249
this.savedMarkers.clear();

src/main/java/com/comphenix/protocol/injector/netty/channel/NettyEventLoopProxy.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -267,6 +267,7 @@ public ChannelFuture register(ChannelPromise channelPromise) {
267267
return this.delegate.register(channelPromise);
268268
}
269269

270+
@Deprecated
270271
@Override
271272
public ChannelFuture register(Channel channel, ChannelPromise promise) {
272273
return this.delegate.register(channel, promise);

0 commit comments

Comments
 (0)