Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public interface Filter {
/**
* Do something with the IChunk<br>
*/
default @Nonnull <T extends IChunk> T applyChunk(T chunk, @Nullable Region region) {
default @Nonnull <U extends IChunk> U applyChunk(U chunk, @Nullable Region region) {
return chunk;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
package com.fastasyncworldedit.core.queue;

import com.fastasyncworldedit.core.queue.implementation.chunk.ChunkHolder;
import com.fastasyncworldedit.core.queue.implementation.chunk.WrapperChunk;
import org.jetbrains.annotations.ApiStatus;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

@ApiStatus.Internal
public interface IQueueChunk<T extends Future<T>> extends IChunk, Callable<T> {

/**
Expand All @@ -15,6 +20,25 @@ default IQueueChunk<T> reset() {
return this;
}

/**
* Set a new {@link WrapperChunk} that allows prevention of a {@link ChunkHolder} instance being cached "locally" whilst it
* has been called/submitted, causing issues with processing/postprocessing, etc.
* If a wrapper has already been set, throws {@link IllegalStateException} as there should be no circumstance for us to set
* a new wrapper (does nothing if attempting to set the same wrapper).
*
* @param parentWrapper wrapper wrapping this {@link ChunkHolder instance}
* @throws IllegalStateException if there is already a wrapper set and a new wrapper instance is attempted to be se
* @since TODO
*/
void setWrapper(WrapperChunk<?> parentWrapper);

/**
* Invalidate the {@link WrapperChunk} if present.
*
* @since TODO
*/
void invalidateWrapper();

/**
* Apply the queued changes to the world containing this chunk.
* <p>The future returned may return another future. To ensure completion keep calling {@link
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import com.fastasyncworldedit.core.internal.simd.SimdSupport;
import com.fastasyncworldedit.core.internal.simd.VectorizedCharFilterBlock;
import com.fastasyncworldedit.core.internal.simd.VectorizedFilter;
import com.fastasyncworldedit.core.queue.implementation.chunk.ChunkHolder;
import com.fastasyncworldedit.core.queue.implementation.chunk.WrapperChunk;
import com.sk89q.worldedit.extent.Extent;
import com.sk89q.worldedit.function.operation.Operation;
import com.sk89q.worldedit.math.BlockVector2;
Expand Down Expand Up @@ -166,11 +168,24 @@ default ChunkFilterBlock apply(
// if (!filter.appliesChunk(chunkX, chunkZ)) {
// return block;
// }
T chunk = this.getOrCreateChunk(chunkX, chunkZ);
T initial = this.getOrCreateChunk(chunkX, chunkZ);
WrapperChunk<T> chunk = new WrapperChunk<>(initial, () -> this.getOrCreateChunk(chunkX, chunkZ));
if (initial instanceof ChunkHolder<?> holder) {
holder.setWrapper(chunk);
}

T newChunk = filter.applyChunk(chunk, region);
IChunk newChunk = filter.applyChunk(chunk, region);
if (newChunk == chunk) {
newChunk = chunk.get();
} else {
T c = (T) newChunk;
chunk.setWrapped(c);
// The IDE lies, it is possible for it to be a ChunkHolder because we're a little loose with our generic types...
if (c instanceof ChunkHolder<?> holder) {
holder.setWrapper(chunk);
}
}
if (newChunk != null) {
chunk = newChunk;
if (block == null) {
if (SimdSupport.useVectorApi() && filter instanceof VectorizedFilter) {
block = new VectorizedCharFilterBlock(this);
Expand All @@ -181,12 +196,16 @@ default ChunkFilterBlock apply(
block.initChunk(chunkX, chunkZ);
chunk.filterBlocks(filter, block, region, full);
}
this.submit(chunk);
// If null, then assume it has already been submitted and the WrapperChunk has therefore been invalidated
T toSubmit = chunk.get();
if (toSubmit != null) {
this.submit(toSubmit);
}
return block;
}

@Override
default <T extends Filter> T apply(Region region, T filter, boolean full) {
default <U extends Filter> U apply(Region region, U filter, boolean full) {
final Set<BlockVector2> chunks = region.getChunks();
ChunkFilterBlock block = null;
for (BlockVector2 chunk : chunks) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;

/**
Expand All @@ -53,6 +54,7 @@ public class SingleThreadQueueExtent extends ExtentBatchProcessorHolder implemen
private final Long2ObjectLinkedOpenHashMap<IQueueChunk<?>> chunks = new Long2ObjectLinkedOpenHashMap<>();
private final ConcurrentLinkedQueue<Future<?>> submissions = new ConcurrentLinkedQueue<>();
private final ReentrantLock getChunkLock = new ReentrantLock();
private final AtomicReference<IQueueChunk> lastChunk = new AtomicReference<>();
private World world = null;
private int minY = 0;
private int maxY = 255;
Expand All @@ -61,7 +63,6 @@ public class SingleThreadQueueExtent extends ExtentBatchProcessorHolder implemen
private boolean initialized;
private Thread currentThread;
// Last access pointers
private volatile IQueueChunk lastChunk;
private boolean enabledQueue = true;
private boolean fastmode = false;
// Array for lazy avoidance of concurrent modification exceptions and needless overcomplication of code (synchronisation is
Expand Down Expand Up @@ -161,13 +162,13 @@ protected synchronized void reset() {
return;
}
getChunkLock.lock();
this.lastChunk.set(null);
try {
this.chunks.clear();
} finally {
getChunkLock.unlock();
}
this.enabledQueue = true;
this.lastChunk = null;
this.currentThread = null;
this.initialized = false;
this.setProcessor(EmptyBatchProcessor.getInstance());
Expand Down Expand Up @@ -221,9 +222,7 @@ public boolean isEmpty() {

@Override
public <V extends Future<V>> V submit(IQueueChunk chunk) {
if (lastChunk == chunk) {
lastChunk = null;
}
this.lastChunk.compareAndExchange(chunk, null);
final long index = MathMan.pairInt(chunk.getX(), chunk.getZ());
getChunkLock.lock();
chunks.remove(index, chunk);
Expand Down Expand Up @@ -254,6 +253,8 @@ private <V extends Future<V>> V submitUnchecked(IQueueChunk chunk) {
}
}

chunk.invalidateWrapper();

if (Fawe.isMainThread()) {
V result = (V) chunk.call();
if (result == null) {
Expand All @@ -277,8 +278,9 @@ public <V extends Future<V>> V submitTaskUnchecked(Callable<V> callable) {
public synchronized boolean trim(boolean aggressive) {
cacheGet.trim(aggressive);
cacheSet.trim(aggressive);
LOGGER.info("trim");
if (Thread.currentThread() == currentThread) {
lastChunk = null;
lastChunk.set(null);
return chunks.isEmpty();
}
if (!submissions.isEmpty()) {
Expand Down Expand Up @@ -316,7 +318,7 @@ public IQueueChunk wrap(IQueueChunk chunk) {

@Override
public final IQueueChunk getOrCreateChunk(int x, int z) {
final IQueueChunk lastChunk = this.lastChunk;
final IQueueChunk lastChunk = this.lastChunk.get();
if (lastChunk != null && lastChunk.getX() == x && lastChunk.getZ() == z) {
return lastChunk;
}
Expand All @@ -330,7 +332,7 @@ public final IQueueChunk getOrCreateChunk(int x, int z) {
try {
IQueueChunk chunk = chunks.get(pair);
if (chunk != null) {
this.lastChunk = chunk;
this.lastChunk.set(chunk);
return chunk;
}
final int size = chunks.size();
Expand All @@ -340,8 +342,9 @@ public final IQueueChunk getOrCreateChunk(int x, int z) {
// - queue size > target size and primary queue has less than num threads submissions
int targetSize = lowMem ? Settings.settings().QUEUE.PARALLEL_THREADS + 8 : this.targetSize;
if (enabledQueue && size > targetSize && (lowMem || Fawe.instance().getQueueHandler().isUnderutilized())) {
chunk = chunks.removeFirst();
final Future future = submitUnchecked(chunk);
IQueueChunk toSubmit = chunks.removeFirst();
this.lastChunk.compareAndExchange(toSubmit, null);
final Future future = submitUnchecked(toSubmit);
if (future != null && !future.isDone()) {
pollSubmissions(targetSize, lowMem);
submissions.add(future);
Expand All @@ -351,7 +354,7 @@ public final IQueueChunk getOrCreateChunk(int x, int z) {
chunk = wrap(chunk);

chunks.put(pair, chunk);
this.lastChunk = chunk;
this.lastChunk.set(chunk);

return chunk;
} finally {
Expand Down Expand Up @@ -481,6 +484,7 @@ public synchronized void flush() {
if (MemUtil.isMemoryLimited()) {
while (!chunks.isEmpty()) {
IQueueChunk chunk = chunks.removeFirst();
this.lastChunk.compareAndExchange(chunk, null);
final Future future = submitUnchecked(chunk);
if (future != null && !future.isDone()) {
pollSubmissions(Settings.settings().QUEUE.PARALLEL_THREADS, true);
Expand All @@ -490,6 +494,7 @@ public synchronized void flush() {
} else {
while (!chunks.isEmpty()) {
IQueueChunk chunk = chunks.removeFirst();
this.lastChunk.compareAndExchange(chunk, null);
final Future future = submitUnchecked(chunk);
if (future != null && !future.isDone()) {
submissions.add(future);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
*/
@SuppressWarnings("rawtypes")
public class ChunkHolder<T extends Future<T>> implements IQueueChunk<T> {

private static final Logger LOGGER = LogManagerCompat.getLogger();

public static ChunkHolder newInstance() {
Expand All @@ -45,7 +46,7 @@ public static ChunkHolder newInstance() {
private volatile IChunkGet chunkExisting; // The existing chunk (e.g. a clipboard, or the world, before changes)
private volatile IChunkSet chunkSet; // The blocks to be set to the chunkExisting
private IBlockDelegate delegate; // delegate handles the abstraction of the chunk layers
private IQueueExtent<? extends IChunk> extent; // the parent queue extent which has this chunk
private IQueueExtent<?> extent; // the parent queue extent which has this chunk
private int chunkX;
private int chunkZ;
private boolean fastmode;
Expand All @@ -54,6 +55,7 @@ public static ChunkHolder newInstance() {
private boolean createCopy = false;
private long initTime = -1L;
private SideEffectSet sideEffectSet;
private WrapperChunk<?> parentWrapper = null;

private ChunkHolder() {
this.delegate = NULL;
Expand All @@ -63,7 +65,28 @@ public void init(IBlockDelegate delegate) {
this.delegate = delegate;
}

@Override
public void setWrapper(WrapperChunk<?> parentWrapper) {
if (parentWrapper == this.parentWrapper) {
return;
}
if (this.parentWrapper != null) {
throw new IllegalStateException("Wrapper already set");
}
this.parentWrapper = parentWrapper;
}

@Override
public void invalidateWrapper() {
if (this.parentWrapper != null) {
if (!this.parentWrapper.invalidate(this)) {
throw new IllegalStateException("Existing chunk not equal to expected");
}
}
}

private static final AtomicBoolean recycleWarning = new AtomicBoolean(false);

@Override
public void recycle() {
if (!recycleWarning.getAndSet(true)) {
Expand Down Expand Up @@ -346,10 +369,12 @@ public int[] getHeightMap(ChunkHolder chunk, HeightMapType type) {

@Override
public void flushLightToGet(ChunkHolder chunk) {
chunk.chunkExisting.setLightingToGet(chunk.chunkSet.getLight(), chunk.chunkSet.getMinSectionPosition(),
chunk.chunkExisting.setLightingToGet(
chunk.chunkSet.getLight(), chunk.chunkSet.getMinSectionPosition(),
chunk.chunkSet.getMaxSectionPosition()
);
chunk.chunkExisting.setSkyLightingToGet(chunk.chunkSet.getSkyLight(), chunk.chunkSet.getMinSectionPosition(),
chunk.chunkExisting.setSkyLightingToGet(
chunk.chunkSet.getSkyLight(), chunk.chunkSet.getMinSectionPosition(),
chunk.chunkSet.getMaxSectionPosition()
);
}
Expand Down Expand Up @@ -892,10 +917,15 @@ public boolean hasNonEmptySection(final int layer) {
public synchronized void filterBlocks(Filter filter, ChunkFilterBlock block, @Nullable Region region, boolean full) {
final IChunkGet get = getOrCreateGet();
final IChunkSet set = getOrCreateSet();
if (parentWrapper == null) {
parentWrapper = new WrapperChunk<>(this, () -> this.extent.getOrCreateChunk(getX(), getZ()));
Comment thread
dordsor21 marked this conversation as resolved.
} else if (parentWrapper.get() != this) {
throw new IllegalStateException("Parent WrapperChunk is not storing this chunk!?");
}
try {
block.filter(this, get, set, filter, region, full);
block.filter(parentWrapper, get, set, filter, region, full);
} finally {
filter.finishChunk(this);
filter.finishChunk(parentWrapper);
}
}

Expand Down Expand Up @@ -998,6 +1028,12 @@ public synchronized <V extends IChunk> void init(IQueueExtent<V> extent, int chu
this.extent = extent;
this.chunkX = chunkX;
this.chunkZ = chunkZ;
if (this.parentWrapper != null) {
if (!this.parentWrapper.invalidate(this)) {
throw new IllegalStateException("Existing chunk not equal to expected");
}
this.parentWrapper = null;
}
if (chunkSet != null) {
chunkSet.reset();
delegate = SET;
Expand All @@ -1013,9 +1049,11 @@ public synchronized T call() {
if (chunkSet != null && !chunkSet.isEmpty()) {
IChunkSet copy = chunkSet.createCopy();

return this.call(extent, copy, () -> {
// Do nothing
});
return this.call(
extent, copy, () -> {
// Do nothing
}
);
}
return null;
}
Expand All @@ -1025,14 +1063,20 @@ public synchronized T call() {
*/

@Override
public <U extends Future<U>> U call(IQueueExtent<? extends IChunk> owner, IChunkSet set, Runnable finalize) {
public <U extends Future<U>> U call(IQueueExtent<?> owner, IChunkSet set, Runnable finalize) {
if (set != null) {
if (parentWrapper != null) {
if (!parentWrapper.invalidate(this)) {
throw new IllegalStateException("Existing chunk not equal to expected");
}
}
IChunkGet get = getOrCreateGet();
try {
get.lockCall();
trackExtent();
boolean postProcess = !(getExtent().getPostProcessor() instanceof EmptyBatchProcessor);
final int copyKey = get.setCreateCopy(postProcess);
// We should always be performing processing/postprocessing with this instance (i.e. not with this.parentWrapper)
final IChunkSet iChunkSet = getExtent().processSet(this, get, set);
Runnable finalizer;
if (postProcess) {
Expand Down Expand Up @@ -1067,7 +1111,7 @@ private void untrackExtent() {
/**
* Get the extent this chunk is in.
*/
public IQueueExtent<? extends IChunk> getExtent() {
public IQueueExtent<?> getExtent() {
return extent;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,14 @@ public boolean isEmpty() {
return true;
}

@Override
public void setWrapper(final WrapperChunk parentWrapper) {
}

@Override
public void invalidateWrapper() {
}

public Future call() {
return null;
}
Expand Down
Loading
Loading