From ce772a5ca18c7a5afe498579a3304e1d71f5fcc7 Mon Sep 17 00:00:00 2001 From: HashimTheArab Date: Sun, 25 Jan 2026 17:14:44 +0300 Subject: [PATCH 1/5] =?UTF-8?q?Fix:=20async=20prefetch=20for=20I/O=20+=20?= =?UTF-8?q?=E2=80=9CFill=E2=80=9D=20lighting?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit I added an async prefetch pipeline in server/world/prefetch.go and changed Loader.Load() to: - Never call tx.w.chunk(pos) on a missing chunk. - Instead, schedule a background prefetch job (requestPrefetch) and retry later. The prefetch job runs outside the world transaction goroutine and does: - Provider.LoadColumn (disk I/O off-tx) - LightArea(...).Fill() (per-chunk lighting off-tx) - then installs the result using a short world transaction (inserts into w.chunks, attaches entities, then does cross-chunk spreading via w.calculateLight(pos)). Benefits - Removes disk I/O from the world tx path for chunk streaming. - Removes the expensive “Fill” lighting pass from the world tx path (this is typically the biggest CPU chunk-light cost). - In practice this should reduce tick spikes / input lag when players move into new areas, because chunk misses no longer block all transactions. --- server/world/conf.go | 3 ++ server/world/loader.go | 12 ++++- server/world/prefetch.go | 102 +++++++++++++++++++++++++++++++++++++++ server/world/world.go | 9 ++++ 4 files changed, 125 insertions(+), 1 deletion(-) create mode 100644 server/world/prefetch.go diff --git a/server/world/conf.go b/server/world/conf.go index 02afa8781..f5a8510a8 100644 --- a/server/world/conf.go +++ b/server/world/conf.go @@ -96,6 +96,8 @@ func (conf Config) New() *World { entities: make(map[*EntityHandle]ChunkPos), viewers: make(map[*Loader]Viewer), chunks: make(map[ChunkPos]*Column), + prefetchRequests: make(chan ChunkPos, 512), + prefetchInFlight: make(map[ChunkPos]struct{}), queueClosing: make(chan struct{}), closing: make(chan struct{}), queue: make(chan transaction, 128), @@ -116,6 +118,7 @@ func (conf Config) New() *World { go t.tickLoop(w) go w.autoSave() go w.handleTransactions() + go w.prefetchLoop() <-w.Exec(t.tick) return w diff --git a/server/world/loader.go b/server/world/loader.go index 7b9b8b39b..464ac07cb 100644 --- a/server/world/loader.go +++ b/server/world/loader.go @@ -100,7 +100,17 @@ func (l *Loader) Load(tx *Tx, n int) { } pos := l.loadQueue[0] - c := tx.w.chunk(pos) + c, ok := tx.w.chunks[pos] + if !ok { + // Don't block the world transaction goroutine on provider IO and lighting. Instead, schedule a background + // prefetch and retry this chunk later. + tx.w.requestPrefetch(pos) + if len(l.loadQueue) > 1 { + // Rotate this chunk to the end so we can schedule prefetches for additional chunks this tick. + l.loadQueue = append(l.loadQueue[1:], pos) + } + continue + } l.viewer.ViewChunk(pos, l.w.Dimension(), c.BlockEntities, c.Chunk) l.w.addViewer(tx, c, l) diff --git a/server/world/prefetch.go b/server/world/prefetch.go new file mode 100644 index 000000000..2cf954e1e --- /dev/null +++ b/server/world/prefetch.go @@ -0,0 +1,102 @@ +package world + +import ( + "errors" + + "github.com/df-mc/dragonfly/server/world/chunk" + "github.com/df-mc/goleveldb/leveldb" +) + +const prefetchWorkers = 4 + +// prefetchLoop runs worker goroutines that load columns from the provider and precompute per-chunk lighting outside +// of the world transaction goroutine. The results are then installed using a short transaction. +func (w *World) prefetchLoop() { + for range prefetchWorkers { + go func() { + for { + select { + case pos := <-w.prefetchRequests: + w.prefetchOne(pos) + case <-w.closing: + return + } + } + }() + } + <-w.closing +} + +// prefetchOne performs provider IO and per-chunk light filling, then schedules installation in the world. It must +// not mutate world state directly. +func (w *World) prefetchOne(pos ChunkPos) { + // Do the expensive IO/CPU work off the world transaction goroutine. + c, err := w.conf.Provider.LoadColumn(pos, w.conf.Dim) + if errors.Is(err, leveldb.ErrNotFound) { + // The provider doesn't have a chunk saved at this position, so we generate a new one. + // Generation is not expected to be common for servers using pre-made maps, but we still handle it. + w.genMu.Lock() + col := &chunk.Column{Chunk: chunk.New(airRID, w.Range())} + w.conf.Generator.GenerateChunk(pos, col.Chunk) + w.genMu.Unlock() + + c, err = col, nil + } else if err != nil { + // Failed reading from provider. We'll still create an empty chunk so that the world can keep running. + c = &chunk.Column{Chunk: chunk.New(airRID, w.Range())} + } + + // Fill light for this chunk only. Cross-chunk spreading is done when installing the chunk (transaction-owned). + chunk.LightArea([]*chunk.Chunk{c.Chunk}, int(pos[0]), int(pos[1])).Fill() + + _ = w.Exec(func(tx *Tx) { + w.installPrefetched(pos, c, err) + }) +} + +func (w *World) installPrefetched(pos ChunkPos, c *chunk.Column, loadErr error) { + // Mark no longer in-flight, regardless of whether we end up installing. + w.prefetchMu.Lock() + delete(w.prefetchInFlight, pos) + w.prefetchMu.Unlock() + + if _, ok := w.chunks[pos]; ok { + // Already loaded by some other path. + return + } + + col := w.columnFrom(c, pos) + w.chunks[pos] = col + for _, e := range col.Entities { + w.entities[e] = pos + e.w = w + } + + // Spreading light requires neighbouring chunks and must happen with transaction-owned access to chunks. + w.calculateLight(pos) + + if loadErr != nil && !errors.Is(loadErr, leveldb.ErrNotFound) { + w.conf.Log.Error("load chunk: "+loadErr.Error(), "X", pos[0], "Z", pos[1]) + } +} + +// requestPrefetch schedules an asynchronous load of the chunk at pos if it isn't already loaded or in-flight. +// This method is intended to be called from within a world transaction. +func (w *World) requestPrefetch(pos ChunkPos) { + if _, ok := w.chunks[pos]; ok { + return + } + + w.prefetchMu.Lock() + if _, ok := w.prefetchInFlight[pos]; ok { + w.prefetchMu.Unlock() + return + } + w.prefetchInFlight[pos] = struct{}{} + w.prefetchMu.Unlock() + + select { + case w.prefetchRequests <- pos: + case <-w.closing: + } +} diff --git a/server/world/world.go b/server/world/world.go index 87e0ac8f3..a666c66cd 100644 --- a/server/world/world.go +++ b/server/world/world.go @@ -52,6 +52,15 @@ type World struct { // from this map after some time of not being used. chunks map[ChunkPos]*Column + // prefetchRequests is used to asynchronously load columns (and precompute per-chunk lighting) outside of + // the world transaction goroutine, so that loading chunks for viewers doesn't block the entire world. + prefetchRequests chan ChunkPos + prefetchMu sync.Mutex + prefetchInFlight map[ChunkPos]struct{} + // genMu serialises chunk generation for async prefetch. Generation is not expected to be performance critical + // for most servers using pre-made maps, but this ensures generator implementations aren't invoked concurrently. + genMu sync.Mutex + // entities holds a map of entities currently loaded and the last ChunkPos // that the Entity was in. These are tracked so that a call to RemoveEntity // can find the correct Entity. From 0a62ef589faae6e646442ee5fcde8b1b2fa00b99 Mon Sep 17 00:00:00 2001 From: HashimTheArab Date: Sun, 25 Jan 2026 17:59:24 +0300 Subject: [PATCH 2/5] fix issues --- server/world/conf.go | 2 +- server/world/prefetch.go | 24 ++++++++++++++++++------ server/world/world.go | 3 +++ 3 files changed, 22 insertions(+), 7 deletions(-) diff --git a/server/world/conf.go b/server/world/conf.go index f5a8510a8..3446efa62 100644 --- a/server/world/conf.go +++ b/server/world/conf.go @@ -112,7 +112,7 @@ func (conf Config) New() *World { w.handler.Store(&h) w.queueing.Add(1) - w.running.Add(2) + w.running.Add(2 + prefetchWorkers) t := ticker{interval: time.Second / 20} go t.tickLoop(w) diff --git a/server/world/prefetch.go b/server/world/prefetch.go index 2cf954e1e..168ea1502 100644 --- a/server/world/prefetch.go +++ b/server/world/prefetch.go @@ -14,17 +14,20 @@ const prefetchWorkers = 4 func (w *World) prefetchLoop() { for range prefetchWorkers { go func() { + defer w.running.Done() for { select { - case pos := <-w.prefetchRequests: - w.prefetchOne(pos) case <-w.closing: return + case pos, ok := <-w.prefetchRequests: + if !ok { + return + } + w.prefetchOne(pos) } } }() } - <-w.closing } // prefetchOne performs provider IO and per-chunk light filling, then schedules installation in the world. It must @@ -49,9 +52,18 @@ func (w *World) prefetchOne(pos ChunkPos) { // Fill light for this chunk only. Cross-chunk spreading is done when installing the chunk (transaction-owned). chunk.LightArea([]*chunk.Chunk{c.Chunk}, int(pos[0]), int(pos[1])).Fill() - _ = w.Exec(func(tx *Tx) { - w.installPrefetched(pos, c, err) - }) + select { + case <-w.closing: + // World is closing: don't enqueue a transaction that might never run. + w.prefetchMu.Lock() + delete(w.prefetchInFlight, pos) + w.prefetchMu.Unlock() + return + default: + _ = w.Exec(func(tx *Tx) { + w.installPrefetched(pos, c, err) + }) + } } func (w *World) installPrefetched(pos ChunkPos, c *chunk.Column, loadErr error) { diff --git a/server/world/world.go b/server/world/world.go index a666c66cd..c4383310b 100644 --- a/server/world/world.go +++ b/server/world/world.go @@ -1050,6 +1050,7 @@ func (w *World) close() { }) close(w.closing) + close(w.prefetchRequests) w.running.Wait() close(w.queueClosing) @@ -1181,7 +1182,9 @@ func (w *World) loadChunk(pos ChunkPos) (*Column, error) { col := newColumn(chunk.New(airRID, w.Range())) w.chunks[pos] = col + w.genMu.Lock() w.conf.Generator.GenerateChunk(pos, col.Chunk) + w.genMu.Unlock() return col, nil default: return newColumn(chunk.New(airRID, w.Range())), err From dbb3d27be83178a7400a03a649572361bbba3932 Mon Sep 17 00:00:00 2001 From: HashimTheArab Date: Sun, 25 Jan 2026 18:38:16 +0300 Subject: [PATCH 3/5] fix issue --- server/world/loader.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/server/world/loader.go b/server/world/loader.go index 464ac07cb..0acf1c325 100644 --- a/server/world/loader.go +++ b/server/world/loader.go @@ -108,6 +108,9 @@ func (l *Loader) Load(tx *Tx, n int) { if len(l.loadQueue) > 1 { // Rotate this chunk to the end so we can schedule prefetches for additional chunks this tick. l.loadQueue = append(l.loadQueue[1:], pos) + } else { + // No other work can be done until this chunk finishes loading. + break } continue } From 67809f883a6312e86326714895ac38ea0e6dd904 Mon Sep 17 00:00:00 2001 From: HashimTheArab Date: Wed, 25 Feb 2026 18:43:54 -0500 Subject: [PATCH 4/5] lets try this --- server/world/conf.go | 5 +- server/world/metrics.go | 167 ++++++++++++++++++++++++++++++++++ server/world/prefetch.go | 10 ++ server/world/save_pipeline.go | 75 +++++++++++++++ server/world/tick.go | 155 ++++++++++++++++++++++++++----- server/world/tx.go | 4 + server/world/world.go | 33 ++++++- 7 files changed, 418 insertions(+), 31 deletions(-) create mode 100644 server/world/metrics.go create mode 100644 server/world/save_pipeline.go diff --git a/server/world/conf.go b/server/world/conf.go index 3446efa62..473539ea3 100644 --- a/server/world/conf.go +++ b/server/world/conf.go @@ -96,6 +96,7 @@ func (conf Config) New() *World { entities: make(map[*EntityHandle]ChunkPos), viewers: make(map[*Loader]Viewer), chunks: make(map[ChunkPos]*Column), + saveRequests: make(chan saveRequest, 512), prefetchRequests: make(chan ChunkPos, 512), prefetchInFlight: make(map[ChunkPos]struct{}), queueClosing: make(chan struct{}), @@ -112,13 +113,15 @@ func (conf Config) New() *World { w.handler.Store(&h) w.queueing.Add(1) - w.running.Add(2 + prefetchWorkers) + w.running.Add(3 + prefetchWorkers + saveWorkers) t := ticker{interval: time.Second / 20} go t.tickLoop(w) go w.autoSave() go w.handleTransactions() go w.prefetchLoop() + go w.saveLoop() + go w.metricsLoop() <-w.Exec(t.tick) return w diff --git a/server/world/metrics.go b/server/world/metrics.go new file mode 100644 index 000000000..ccfb402a8 --- /dev/null +++ b/server/world/metrics.go @@ -0,0 +1,167 @@ +package world + +import ( + "sync/atomic" + "time" +) + +const worldMetricsLogInterval = 10 * time.Second + +type worldMetrics struct { + execEnqueueCount atomic.Uint64 + execEnqueueNanos atomic.Uint64 + execEnqueueMaxNanos atomic.Uint64 + queueLenMax atomic.Int64 + + txCount atomic.Uint64 + txNanos atomic.Uint64 + txMaxNanos atomic.Uint64 + + tickCount atomic.Uint64 + tickNanos atomic.Uint64 + tickMaxNanos atomic.Uint64 + + tickEntityNanos atomic.Uint64 + tickScheduledNanos atomic.Uint64 + tickRandomNanos atomic.Uint64 + tickNeighbourNanos atomic.Uint64 + + prefetchDropped atomic.Uint64 +} + +type worldMetricsSnapshot struct { + execEnqueueCount uint64 + execEnqueueNanos uint64 + execEnqueueMaxNanos uint64 + queueLenMax int64 + + txCount uint64 + txNanos uint64 + txMaxNanos uint64 + + tickCount uint64 + tickNanos uint64 + tickMaxNanos uint64 + + tickEntityNanos uint64 + tickScheduledNanos uint64 + tickRandomNanos uint64 + tickNeighbourNanos uint64 + + prefetchDropped uint64 +} + +func (m *worldMetrics) observeExecEnqueue(d time.Duration, queueLen int) { + nanos := uint64(d) + m.execEnqueueCount.Add(1) + m.execEnqueueNanos.Add(nanos) + updateMaxUint64(&m.execEnqueueMaxNanos, nanos) + updateMaxInt64(&m.queueLenMax, int64(queueLen)) +} + +func (m *worldMetrics) observeTx(d time.Duration) { + nanos := uint64(d) + m.txCount.Add(1) + m.txNanos.Add(nanos) + updateMaxUint64(&m.txMaxNanos, nanos) +} + +func (m *worldMetrics) observeTick(total, entity, scheduled, random, neighbour time.Duration) { + m.tickCount.Add(1) + m.tickNanos.Add(uint64(total)) + m.tickEntityNanos.Add(uint64(entity)) + m.tickScheduledNanos.Add(uint64(scheduled)) + m.tickRandomNanos.Add(uint64(random)) + m.tickNeighbourNanos.Add(uint64(neighbour)) + updateMaxUint64(&m.tickMaxNanos, uint64(total)) +} + +func (m *worldMetrics) observePrefetchDropped() { + m.prefetchDropped.Add(1) +} + +func (m *worldMetrics) snapshotAndReset() worldMetricsSnapshot { + return worldMetricsSnapshot{ + execEnqueueCount: m.execEnqueueCount.Swap(0), + execEnqueueNanos: m.execEnqueueNanos.Swap(0), + execEnqueueMaxNanos: m.execEnqueueMaxNanos.Swap(0), + queueLenMax: m.queueLenMax.Swap(0), + txCount: m.txCount.Swap(0), + txNanos: m.txNanos.Swap(0), + txMaxNanos: m.txMaxNanos.Swap(0), + tickCount: m.tickCount.Swap(0), + tickNanos: m.tickNanos.Swap(0), + tickMaxNanos: m.tickMaxNanos.Swap(0), + tickEntityNanos: m.tickEntityNanos.Swap(0), + tickScheduledNanos: m.tickScheduledNanos.Swap(0), + tickRandomNanos: m.tickRandomNanos.Swap(0), + tickNeighbourNanos: m.tickNeighbourNanos.Swap(0), + prefetchDropped: m.prefetchDropped.Swap(0), + } +} + +func (w *World) metricsLoop() { + defer w.running.Done() + + t := time.NewTicker(worldMetricsLogInterval) + defer t.Stop() + + for { + select { + case <-w.closing: + return + case <-t.C: + s := w.metrics.snapshotAndReset() + if s.txCount == 0 && s.tickCount == 0 && s.prefetchDropped == 0 { + continue + } + w.conf.Log.Debug( + "world perf", + "window", worldMetricsLogInterval.String(), + "queue_max", s.queueLenMax, + "enqueue_avg_ms", avgMillis(s.execEnqueueNanos, s.execEnqueueCount), + "enqueue_max_ms", millisFromNanos(s.execEnqueueMaxNanos), + "tx_count", s.txCount, + "tx_avg_ms", avgMillis(s.txNanos, s.txCount), + "tx_max_ms", millisFromNanos(s.txMaxNanos), + "tick_count", s.tickCount, + "tick_avg_ms", avgMillis(s.tickNanos, s.tickCount), + "tick_max_ms", millisFromNanos(s.tickMaxNanos), + "tick_entities_avg_ms", avgMillis(s.tickEntityNanos, s.tickCount), + "tick_scheduled_avg_ms", avgMillis(s.tickScheduledNanos, s.tickCount), + "tick_random_avg_ms", avgMillis(s.tickRandomNanos, s.tickCount), + "tick_neighbour_avg_ms", avgMillis(s.tickNeighbourNanos, s.tickCount), + "prefetch_dropped", s.prefetchDropped, + ) + } + } +} + +func updateMaxUint64(dst *atomic.Uint64, value uint64) { + for { + current := dst.Load() + if value <= current || dst.CompareAndSwap(current, value) { + return + } + } +} + +func updateMaxInt64(dst *atomic.Int64, value int64) { + for { + current := dst.Load() + if value <= current || dst.CompareAndSwap(current, value) { + return + } + } +} + +func avgMillis(total, count uint64) float64 { + if count == 0 { + return 0 + } + return millisFromNanos(total) / float64(count) +} + +func millisFromNanos(nanos uint64) float64 { + return float64(nanos) / float64(time.Millisecond) +} diff --git a/server/world/prefetch.go b/server/world/prefetch.go index 168ea1502..5346c1c46 100644 --- a/server/world/prefetch.go +++ b/server/world/prefetch.go @@ -110,5 +110,15 @@ func (w *World) requestPrefetch(pos ChunkPos) { select { case w.prefetchRequests <- pos: case <-w.closing: + w.prefetchMu.Lock() + delete(w.prefetchInFlight, pos) + w.prefetchMu.Unlock() + default: + // Keep the transaction path non-blocking under load. If the queue is saturated, clear in-flight so we can + // retry this position later. + w.prefetchMu.Lock() + delete(w.prefetchInFlight, pos) + w.prefetchMu.Unlock() + w.metrics.observePrefetchDropped() } } diff --git a/server/world/save_pipeline.go b/server/world/save_pipeline.go new file mode 100644 index 000000000..d8dc7124d --- /dev/null +++ b/server/world/save_pipeline.go @@ -0,0 +1,75 @@ +package world + +import "github.com/df-mc/dragonfly/server/world/chunk" + +const saveWorkers = 1 + +type saveRequest struct { + pos ChunkPos + col *chunk.Column + ack chan struct{} +} + +// saveLoop runs background persistence workers that handle provider writes for queued save requests. +func (w *World) saveLoop() { + for range saveWorkers { + go func() { + defer w.running.Done() + + for { + select { + case req := <-w.saveRequests: + w.storeColumn(req.pos, req.col) + if req.ack != nil { + close(req.ack) + } + case <-w.closing: + // Drain any already queued requests before exiting. + for { + select { + case req := <-w.saveRequests: + w.storeColumn(req.pos, req.col) + if req.ack != nil { + close(req.ack) + } + default: + return + } + } + } + } + }() + } +} + +// submitAsyncColumnSave queues a chunk save request. If the queue is saturated, it falls back to synchronous storage +// to preserve ordering guarantees. +func (w *World) submitAsyncColumnSave(pos ChunkPos, col *chunk.Column) { + req := saveRequest{pos: pos, col: col} + select { + case w.saveRequests <- req: + return + default: + // Queue is saturated: Block until there is room to preserve write ordering across sync/async save paths. + w.saveRequests <- req + } +} + +// submitSyncColumnSave queues a save request and blocks until persistence completed. +func (w *World) submitSyncColumnSave(pos ChunkPos, col *chunk.Column) { + ack := make(chan struct{}) + req := saveRequest{pos: pos, col: col, ack: ack} + select { + case w.saveRequests <- req: + <-ack + case <-w.closing: + // Fall back to direct write if called after shutdown has started. + w.storeColumn(pos, col) + } +} + +func (w *World) storeColumn(pos ChunkPos, col *chunk.Column) { + if err := w.conf.Provider.StoreColumn(pos, w.conf.Dim, col); err != nil { + w.conf.Log.Error("save chunk: "+err.Error(), "X", pos[0], "Z", pos[1]) + } +} diff --git a/server/world/tick.go b/server/world/tick.go index 9d06cf526..1e3a53f09 100644 --- a/server/world/tick.go +++ b/server/world/tick.go @@ -1,6 +1,7 @@ package world import ( + "container/heap" "maps" "math/rand/v2" "slices" @@ -36,6 +37,7 @@ func (t ticker) tickLoop(w *World) { // tick performs a tick on the World and updates the time, weather, blocks and // entities that require updates. func (t ticker) tick(tx *Tx) { + tickStart := time.Now() viewers, loaders := tx.World().allViewers() w := tx.World() @@ -88,10 +90,23 @@ func (t ticker) tick(tx *Tx) { w.tickLightning(tx) } + entitiesStart := time.Now() t.tickEntities(tx, tick) + entitiesDur := time.Since(entitiesStart) + + scheduledStart := time.Now() w.scheduledUpdates.tick(tx, tick) + scheduledDur := time.Since(scheduledStart) + + randomStart := time.Now() t.tickBlocksRandomly(tx, loaders, tick) + randomDur := time.Since(randomStart) + + neighbourStart := time.Now() t.performNeighbourUpdates(tx) + neighbourDur := time.Since(neighbourStart) + + w.metrics.observeTick(time.Since(tickStart), entitiesDur, scheduledDur, randomDur, neighbourDur) } // performNeighbourUpdates performs all block updates that came as a result of a neighbouring block being changed. @@ -268,9 +283,14 @@ func (g *randUint4) uint4(r *rand.Rand) uint8 { // scheduledTickQueue implements a queue for scheduled block updates. Scheduled // block updates are both position and block type specific. type scheduledTickQueue struct { - ticks []scheduledTick + ticks scheduledTickHeap + currentTick int64 + + // furthestTicks tracks the furthest pending tick for a position+block hash. A newly scheduled tick is only + // added if it occurs after the furthest currently pending tick for the same position+block hash. furthestTicks map[scheduledTickIndex]int64 - currentTick int64 + // byChunk tracks scheduled ticks by chunk so saving/unloading chunks does not need to scan all pending ticks. + byChunk map[ChunkPos]map[*scheduledTick]struct{} } type scheduledTick struct { @@ -278,6 +298,8 @@ type scheduledTick struct { b Block bhash uint64 t int64 + + cancelled bool } type scheduledTickIndex struct { @@ -287,7 +309,13 @@ type scheduledTickIndex struct { // newScheduledTickQueue creates a queue for scheduled block ticks. func newScheduledTickQueue(tick int64) *scheduledTickQueue { - return &scheduledTickQueue{furthestTicks: make(map[scheduledTickIndex]int64), currentTick: tick} + q := &scheduledTickQueue{ + currentTick: tick, + furthestTicks: make(map[scheduledTickIndex]int64), + byChunk: make(map[ChunkPos]map[*scheduledTick]struct{}), + } + heap.Init(&q.ticks) + return q } // tick processes scheduled ticks, calling ScheduledTicker.ScheduledTick for any @@ -297,10 +325,26 @@ func (queue *scheduledTickQueue) tick(tx *Tx, tick int64) { queue.currentTick = tick w := tx.World() - for _, t := range queue.ticks { - if t.t > tick { + for queue.ticks.Len() > 0 { + t := queue.ticks[0] + if t.cancelled { + _ = heap.Pop(&queue.ticks) + queue.removeFromChunkIndex(t) continue } + if t.t > tick { + // Remaining ticks are all scheduled for later. + break + } + _ = heap.Pop(&queue.ticks) + queue.removeFromChunkIndex(t) + index := scheduledTickIndex{pos: t.pos, hash: t.bhash} + // Because ticks for the same position+hash are only added with increasing tick values, popping the + // furthest tick means no entries for this index remain. + if furthest, ok := queue.furthestTicks[index]; ok && furthest == t.t { + delete(queue.furthestTicks, index) + } + b := tx.Block(t.pos) if ticker, ok := b.(ScheduledTicker); ok && BlockHash(b) == t.bhash { ticker.ScheduledTick(t.pos, tx, w.r) @@ -310,14 +354,6 @@ func (queue *scheduledTickQueue) tick(tx *Tx, tick int64) { } } } - - // Clear scheduled ticks that were processed from the queue. - queue.ticks = slices.DeleteFunc(queue.ticks, func(t scheduledTick) bool { - return t.t <= tick - }) - maps.DeleteFunc(queue.furthestTicks, func(index scheduledTickIndex, t int64) bool { - return t <= tick - }) } // schedule schedules a block update at the position passed for the block type @@ -333,39 +369,108 @@ func (queue *scheduledTickQueue) schedule(pos cube.Pos, b Block, delay time.Dura // after any currently scheduled updates. return } + t := &scheduledTick{pos: pos, t: resTick, b: b, bhash: index.hash} queue.furthestTicks[index] = resTick - queue.ticks = append(queue.ticks, scheduledTick{pos: pos, t: resTick, b: b, bhash: index.hash}) + heap.Push(&queue.ticks, t) + + chunkPos := chunkPosFromBlockPos(pos) + m, ok := queue.byChunk[chunkPos] + if !ok { + m = make(map[*scheduledTick]struct{}, 1) + queue.byChunk[chunkPos] = m + } + m[t] = struct{}{} } // fromChunk returns all scheduled ticks positioned within a ChunkPos. func (queue *scheduledTickQueue) fromChunk(pos ChunkPos) []scheduledTick { - m := make([]scheduledTick, 0, 8) - for _, t := range queue.ticks { - if pos == chunkPosFromBlockPos(t.pos) { - m = append(m, t) + pending, ok := queue.byChunk[pos] + if !ok { + return nil + } + m := make([]scheduledTick, 0, len(pending)) + for t := range pending { + if t.cancelled { + continue } + m = append(m, *t) } return m } // removeChunk removes all scheduled ticks positioned within a ChunkPos. func (queue *scheduledTickQueue) removeChunk(pos ChunkPos) { - queue.ticks = slices.DeleteFunc(queue.ticks, func(tick scheduledTick) bool { - return chunkPosFromBlockPos(tick.pos) == pos - }) + pending, ok := queue.byChunk[pos] + if !ok { + return + } + for t := range pending { + t.cancelled = true + index := scheduledTickIndex{pos: t.pos, hash: t.bhash} + if furthest, ok := queue.furthestTicks[index]; ok && furthest == t.t { + delete(queue.furthestTicks, index) + } + } + delete(queue.byChunk, pos) } // add adds a slice of scheduled ticks to the queue. It assumes no duplicate // ticks are present in the slice. func (queue *scheduledTickQueue) add(ticks []scheduledTick) { - queue.ticks = append(queue.ticks, ticks...) for _, t := range ticks { index := scheduledTickIndex{pos: t.pos, hash: t.bhash} + tt := &scheduledTick{pos: t.pos, b: t.b, bhash: t.bhash, t: t.t} if existing, ok := queue.furthestTicks[index]; ok { - // Make sure we find the furthest tick for each of the ticks added. - // Some ticks may have the same block and position, in which case we - // need to set the furthest tick. queue.furthestTicks[index] = max(existing, t.t) + } else { + queue.furthestTicks[index] = t.t + } + heap.Push(&queue.ticks, tt) + + chunkPos := chunkPosFromBlockPos(t.pos) + m, ok := queue.byChunk[chunkPos] + if !ok { + m = make(map[*scheduledTick]struct{}, 1) + queue.byChunk[chunkPos] = m } + m[tt] = struct{}{} } } + +func (queue *scheduledTickQueue) removeFromChunkIndex(t *scheduledTick) { + chunkPos := chunkPosFromBlockPos(t.pos) + m, ok := queue.byChunk[chunkPos] + if !ok { + return + } + delete(m, t) + if len(m) == 0 { + delete(queue.byChunk, chunkPos) + } +} + +type scheduledTickHeap []*scheduledTick + +func (h scheduledTickHeap) Len() int { + return len(h) +} + +func (h scheduledTickHeap) Less(i, j int) bool { + return h[i].t < h[j].t +} + +func (h scheduledTickHeap) Swap(i, j int) { + h[i], h[j] = h[j], h[i] +} + +func (h *scheduledTickHeap) Push(x any) { + *h = append(*h, x.(*scheduledTick)) +} + +func (h *scheduledTickHeap) Pop() any { + old := *h + n := len(old) + x := old[n-1] + *h = old[:n-1] + return x +} diff --git a/server/world/tx.go b/server/world/tx.go index 4183e02fe..43c5863f0 100644 --- a/server/world/tx.go +++ b/server/world/tx.go @@ -302,7 +302,9 @@ type normalTransaction struct { // ntx.c. func (ntx normalTransaction) Run(w *World) { tx := &Tx{w: w} + start := time.Now() ntx.f(tx) + w.metrics.observeTx(time.Since(start)) tx.close() close(ntx.c) } @@ -323,7 +325,9 @@ func (wtx weakTransaction) Run(w *World) { valid := !wtx.invalid.Load() if valid { tx := &Tx{w: w} + start := time.Now() wtx.f(tx) + w.metrics.observeTx(time.Since(start)) tx.close() } // We have to acquire a lock on wtx.cond.L here to make sure cond.Wait() diff --git a/server/world/world.go b/server/world/world.go index c4383310b..8d9d2f488 100644 --- a/server/world/world.go +++ b/server/world/world.go @@ -33,6 +33,7 @@ type World struct { queue chan transaction queueClosing chan struct{} queueing sync.WaitGroup + metrics worldMetrics // advance is a bool that specifies if this World should advance the current // tick, time and weather saved in the Settings struct held by the World. @@ -52,6 +53,10 @@ type World struct { // from this map after some time of not being used. chunks map[ChunkPos]*Column + // saveRequests is a background persistence queue used to keep slow provider writes off the transaction goroutine + // for chunk unloads. + saveRequests chan saveRequest + // prefetchRequests is used to asynchronously load columns (and precompute per-chunk lighting) outside of // the world transaction goroutine, so that loading chunks for viewers doesn't block the entire world. prefetchRequests chan ChunkPos @@ -124,13 +129,17 @@ type ExecFunc func(tx *Tx) // that is closed once the transaction is complete. func (w *World) Exec(f ExecFunc) <-chan struct{} { c := make(chan struct{}) + start := time.Now() w.queue <- normalTransaction{c: c, f: f} + w.metrics.observeExecEnqueue(time.Since(start), len(w.queue)) return c } func (w *World) weakExec(invalid *atomic.Bool, cond *sync.Cond, f ExecFunc) <-chan bool { c := make(chan bool, 1) + start := time.Now() w.queue <- weakTransaction{c: c, f: f, invalid: invalid, cond: cond} + w.metrics.observeExecEnqueue(time.Since(start), len(w.queue)) return c } @@ -1010,9 +1019,7 @@ func (w *World) save(f func(*Tx, ChunkPos, *Column)) ExecFunc { func (w *World) saveChunk(_ *Tx, pos ChunkPos, c *Column) { if !w.conf.ReadOnly && c.modified { c.Compact() - if err := w.conf.Provider.StoreColumn(pos, w.conf.Dim, w.columnTo(c, pos)); err != nil { - w.conf.Log.Error("save chunk: "+err.Error(), "X", pos[0], "Z", pos[1]) - } + w.submitSyncColumnSave(pos, w.columnTo(c, pos)) } } @@ -1032,6 +1039,23 @@ func (w *World) closeChunk(tx *Tx, pos ChunkPos, c *Column) { delete(w.chunks, pos) } +// closeChunkAsync persists a chunk snapshot asynchronously and unloads it from memory. +func (w *World) closeChunkAsync(tx *Tx, pos ChunkPos, c *Column) { + if !w.conf.ReadOnly && c.modified { + c.Compact() + w.submitAsyncColumnSave(pos, w.columnTo(c, pos)) + } + w.scheduledUpdates.removeChunk(pos) + // Note: We close c.Entities here because some entities may remove + // themselves from the world in their Close method, which can lead to + // unexpected conditions. + for _, e := range slices.Clone(c.Entities) { + _ = e.mustEntity(tx).Close() + } + clear(c.Entities) + delete(w.chunks, pos) +} + // Close closes the world and saves all chunks currently loaded. func (w *World) Close() error { w.o.Do(w.close) @@ -1050,7 +1074,6 @@ func (w *World) close() { }) close(w.closing) - close(w.prefetchRequests) w.running.Wait() close(w.queueClosing) @@ -1255,7 +1278,7 @@ func (w *World) autoSave() { func (w *World) closeUnusedChunks(tx *Tx) { for pos, c := range w.chunks { if len(c.viewers) == 0 { - w.closeChunk(tx, pos, c) + w.closeChunkAsync(tx, pos, c) } } } From 3fb5120e5bb83189c2c53e7cb97080ed407889a7 Mon Sep 17 00:00:00 2001 From: HashimTheArab Date: Tue, 7 Apr 2026 12:19:12 -0400 Subject: [PATCH 5/5] fix prefetch thing --- server/world/prefetch.go | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/server/world/prefetch.go b/server/world/prefetch.go index 5346c1c46..faf814a39 100644 --- a/server/world/prefetch.go +++ b/server/world/prefetch.go @@ -72,6 +72,12 @@ func (w *World) installPrefetched(pos ChunkPos, c *chunk.Column, loadErr error) delete(w.prefetchInFlight, pos) w.prefetchMu.Unlock() + if loadErr != nil && !errors.Is(loadErr, leveldb.ErrNotFound) { + // Keep errored loads out of the chunk cache so later accesses can retry the provider. + w.conf.Log.Error("load chunk: "+loadErr.Error(), "X", pos[0], "Z", pos[1]) + return + } + if _, ok := w.chunks[pos]; ok { // Already loaded by some other path. return @@ -86,10 +92,6 @@ func (w *World) installPrefetched(pos ChunkPos, c *chunk.Column, loadErr error) // Spreading light requires neighbouring chunks and must happen with transaction-owned access to chunks. w.calculateLight(pos) - - if loadErr != nil && !errors.Is(loadErr, leveldb.ErrNotFound) { - w.conf.Log.Error("load chunk: "+loadErr.Error(), "X", pos[0], "Z", pos[1]) - } } // requestPrefetch schedules an asynchronous load of the chunk at pos if it isn't already loaded or in-flight.