Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion server/world/conf.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,9 @@ func (conf Config) New() *World {
entities: make(map[*EntityHandle]ChunkPos),
viewers: make(map[*Loader]Viewer),
chunks: make(map[ChunkPos]*Column),
saveRequests: make(chan saveRequest, 512),
prefetchRequests: make(chan ChunkPos, 512),
prefetchInFlight: make(map[ChunkPos]struct{}),
queueClosing: make(chan struct{}),
closing: make(chan struct{}),
queue: make(chan transaction, 128),
Expand All @@ -136,12 +139,15 @@ func (conf Config) New() *World {
w.handler.Store(&h)

w.queueing.Add(1)
w.running.Add(2)
w.running.Add(3 + prefetchWorkers + saveWorkers)

t := ticker{interval: time.Second / 20}
go t.tickLoop(w)
go w.autoSave()
go w.handleTransactions()
go w.prefetchLoop()
go w.saveLoop()
go w.metricsLoop()

<-w.Exec(t.tick)
return w
Expand Down
15 changes: 14 additions & 1 deletion server/world/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,20 @@ func (l *Loader) Load(tx *Tx, n int) {
}

pos := l.loadQueue[0]
c := tx.w.chunk(pos)
c, ok := tx.w.chunks[pos]
if !ok {
// Don't block the world transaction goroutine on provider IO and lighting. Instead, schedule a background
// prefetch and retry this chunk later.
tx.w.requestPrefetch(pos)
if len(l.loadQueue) > 1 {
// Rotate this chunk to the end so we can schedule prefetches for additional chunks this tick.
l.loadQueue = append(l.loadQueue[1:], pos)
} else {
// No other work can be done until this chunk finishes loading.
break
}
continue
Comment thread
HashimTheArab marked this conversation as resolved.
}

l.viewer.ViewChunk(pos, l.w.Dimension(), c.BlockEntities, c.Chunk)
l.w.addViewer(tx, c, l)
Expand Down
167 changes: 167 additions & 0 deletions server/world/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
package world

import (
"sync/atomic"
"time"
)

const worldMetricsLogInterval = 10 * time.Second

type worldMetrics struct {
execEnqueueCount atomic.Uint64
execEnqueueNanos atomic.Uint64
execEnqueueMaxNanos atomic.Uint64
queueLenMax atomic.Int64

txCount atomic.Uint64
txNanos atomic.Uint64
txMaxNanos atomic.Uint64

tickCount atomic.Uint64
tickNanos atomic.Uint64
tickMaxNanos atomic.Uint64

tickEntityNanos atomic.Uint64
tickScheduledNanos atomic.Uint64
tickRandomNanos atomic.Uint64
tickNeighbourNanos atomic.Uint64

prefetchDropped atomic.Uint64
}

type worldMetricsSnapshot struct {
execEnqueueCount uint64
execEnqueueNanos uint64
execEnqueueMaxNanos uint64
queueLenMax int64

txCount uint64
txNanos uint64
txMaxNanos uint64

tickCount uint64
tickNanos uint64
tickMaxNanos uint64

tickEntityNanos uint64
tickScheduledNanos uint64
tickRandomNanos uint64
tickNeighbourNanos uint64

prefetchDropped uint64
}

func (m *worldMetrics) observeExecEnqueue(d time.Duration, queueLen int) {
nanos := uint64(d)
m.execEnqueueCount.Add(1)
m.execEnqueueNanos.Add(nanos)
updateMaxUint64(&m.execEnqueueMaxNanos, nanos)
updateMaxInt64(&m.queueLenMax, int64(queueLen))
}

func (m *worldMetrics) observeTx(d time.Duration) {
nanos := uint64(d)
m.txCount.Add(1)
m.txNanos.Add(nanos)
updateMaxUint64(&m.txMaxNanos, nanos)
}

func (m *worldMetrics) observeTick(total, entity, scheduled, random, neighbour time.Duration) {
m.tickCount.Add(1)
m.tickNanos.Add(uint64(total))
m.tickEntityNanos.Add(uint64(entity))
m.tickScheduledNanos.Add(uint64(scheduled))
m.tickRandomNanos.Add(uint64(random))
m.tickNeighbourNanos.Add(uint64(neighbour))
updateMaxUint64(&m.tickMaxNanos, uint64(total))
}

func (m *worldMetrics) observePrefetchDropped() {
m.prefetchDropped.Add(1)
}

func (m *worldMetrics) snapshotAndReset() worldMetricsSnapshot {
return worldMetricsSnapshot{
execEnqueueCount: m.execEnqueueCount.Swap(0),
execEnqueueNanos: m.execEnqueueNanos.Swap(0),
execEnqueueMaxNanos: m.execEnqueueMaxNanos.Swap(0),
queueLenMax: m.queueLenMax.Swap(0),
txCount: m.txCount.Swap(0),
txNanos: m.txNanos.Swap(0),
txMaxNanos: m.txMaxNanos.Swap(0),
tickCount: m.tickCount.Swap(0),
tickNanos: m.tickNanos.Swap(0),
tickMaxNanos: m.tickMaxNanos.Swap(0),
tickEntityNanos: m.tickEntityNanos.Swap(0),
tickScheduledNanos: m.tickScheduledNanos.Swap(0),
tickRandomNanos: m.tickRandomNanos.Swap(0),
tickNeighbourNanos: m.tickNeighbourNanos.Swap(0),
prefetchDropped: m.prefetchDropped.Swap(0),
}
}

func (w *World) metricsLoop() {
defer w.running.Done()

t := time.NewTicker(worldMetricsLogInterval)
defer t.Stop()

for {
select {
case <-w.closing:
return
case <-t.C:
s := w.metrics.snapshotAndReset()
if s.txCount == 0 && s.tickCount == 0 && s.prefetchDropped == 0 {
continue
}
w.conf.Log.Debug(
"world perf",
"window", worldMetricsLogInterval.String(),
"queue_max", s.queueLenMax,
"enqueue_avg_ms", avgMillis(s.execEnqueueNanos, s.execEnqueueCount),
"enqueue_max_ms", millisFromNanos(s.execEnqueueMaxNanos),
"tx_count", s.txCount,
"tx_avg_ms", avgMillis(s.txNanos, s.txCount),
"tx_max_ms", millisFromNanos(s.txMaxNanos),
"tick_count", s.tickCount,
"tick_avg_ms", avgMillis(s.tickNanos, s.tickCount),
"tick_max_ms", millisFromNanos(s.tickMaxNanos),
"tick_entities_avg_ms", avgMillis(s.tickEntityNanos, s.tickCount),
"tick_scheduled_avg_ms", avgMillis(s.tickScheduledNanos, s.tickCount),
"tick_random_avg_ms", avgMillis(s.tickRandomNanos, s.tickCount),
"tick_neighbour_avg_ms", avgMillis(s.tickNeighbourNanos, s.tickCount),
"prefetch_dropped", s.prefetchDropped,
)
}
}
}

func updateMaxUint64(dst *atomic.Uint64, value uint64) {
for {
current := dst.Load()
if value <= current || dst.CompareAndSwap(current, value) {
return
}
}
}

func updateMaxInt64(dst *atomic.Int64, value int64) {
for {
current := dst.Load()
if value <= current || dst.CompareAndSwap(current, value) {
return
}
}
}

func avgMillis(total, count uint64) float64 {
if count == 0 {
return 0
}
return millisFromNanos(total) / float64(count)
}

func millisFromNanos(nanos uint64) float64 {
return float64(nanos) / float64(time.Millisecond)
}
126 changes: 126 additions & 0 deletions server/world/prefetch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
package world

import (
"errors"

"github.com/df-mc/dragonfly/server/world/chunk"
"github.com/df-mc/goleveldb/leveldb"
)

const prefetchWorkers = 4

// prefetchLoop runs worker goroutines that load columns from the provider and precompute per-chunk lighting outside
// of the world transaction goroutine. The results are then installed using a short transaction.
func (w *World) prefetchLoop() {
for range prefetchWorkers {
go func() {
defer w.running.Done()
for {
select {
case <-w.closing:
return
case pos, ok := <-w.prefetchRequests:
if !ok {
return
}
w.prefetchOne(pos)
}
}
}()
}
}
Comment thread
HashimTheArab marked this conversation as resolved.

// prefetchOne performs provider IO and per-chunk light filling, then schedules installation in the world. It must
// not mutate world state directly.
func (w *World) prefetchOne(pos ChunkPos) {
// Do the expensive IO/CPU work off the world transaction goroutine.
c, err := w.conf.Provider.LoadColumn(pos, w.conf.Dim)
if errors.Is(err, leveldb.ErrNotFound) {
// The provider doesn't have a chunk saved at this position, so we generate a new one.
// Generation is not expected to be common for servers using pre-made maps, but we still handle it.
w.genMu.Lock()
col := &chunk.Column{Chunk: chunk.New(airRID, w.Range())}

Check failure on line 42 in server/world/prefetch.go

View workflow job for this annotation

GitHub Actions / Build

undefined: airRID

Check failure on line 42 in server/world/prefetch.go

View workflow job for this annotation

GitHub Actions / Deploy

undefined: airRID

Check failure on line 42 in server/world/prefetch.go

View workflow job for this annotation

GitHub Actions / Build

undefined: airRID
w.conf.Generator.GenerateChunk(pos, col.Chunk)
w.genMu.Unlock()

c, err = col, nil
Comment thread
HashimTheArab marked this conversation as resolved.
} else if err != nil {
// Failed reading from provider. We'll still create an empty chunk so that the world can keep running.
c = &chunk.Column{Chunk: chunk.New(airRID, w.Range())}

Check failure on line 49 in server/world/prefetch.go

View workflow job for this annotation

GitHub Actions / Build

undefined: airRID (typecheck)

Check failure on line 49 in server/world/prefetch.go

View workflow job for this annotation

GitHub Actions / Deploy

undefined: airRID

Check failure on line 49 in server/world/prefetch.go

View workflow job for this annotation

GitHub Actions / Build

undefined: airRID (typecheck)
}

// Fill light for this chunk only. Cross-chunk spreading is done when installing the chunk (transaction-owned).
chunk.LightArea([]*chunk.Chunk{c.Chunk}, int(pos[0]), int(pos[1])).Fill()

select {
case <-w.closing:
// World is closing: don't enqueue a transaction that might never run.
w.prefetchMu.Lock()
delete(w.prefetchInFlight, pos)
w.prefetchMu.Unlock()
return
default:
_ = w.Exec(func(tx *Tx) {
w.installPrefetched(pos, c, err)
})
}
}

func (w *World) installPrefetched(pos ChunkPos, c *chunk.Column, loadErr error) {
// Mark no longer in-flight, regardless of whether we end up installing.
w.prefetchMu.Lock()
delete(w.prefetchInFlight, pos)
w.prefetchMu.Unlock()

if loadErr != nil && !errors.Is(loadErr, leveldb.ErrNotFound) {
// Keep errored loads out of the chunk cache so later accesses can retry the provider.
w.conf.Log.Error("load chunk: "+loadErr.Error(), "X", pos[0], "Z", pos[1])
return
}

if _, ok := w.chunks[pos]; ok {
// Already loaded by some other path.
return
}

col := w.columnFrom(c, pos)
w.chunks[pos] = col
for _, e := range col.Entities {
w.entities[e] = pos
e.w = w
}

// Spreading light requires neighbouring chunks and must happen with transaction-owned access to chunks.
w.calculateLight(pos)
}
Comment thread
cursor[bot] marked this conversation as resolved.

// requestPrefetch schedules an asynchronous load of the chunk at pos if it isn't already loaded or in-flight.
// This method is intended to be called from within a world transaction.
func (w *World) requestPrefetch(pos ChunkPos) {
if _, ok := w.chunks[pos]; ok {
return
}

w.prefetchMu.Lock()
if _, ok := w.prefetchInFlight[pos]; ok {
w.prefetchMu.Unlock()
return
}
w.prefetchInFlight[pos] = struct{}{}
w.prefetchMu.Unlock()

select {
case w.prefetchRequests <- pos:
case <-w.closing:
w.prefetchMu.Lock()
delete(w.prefetchInFlight, pos)
w.prefetchMu.Unlock()
default:
// Keep the transaction path non-blocking under load. If the queue is saturated, clear in-flight so we can
// retry this position later.
w.prefetchMu.Lock()
delete(w.prefetchInFlight, pos)
w.prefetchMu.Unlock()
w.metrics.observePrefetchDropped()
}
}
Loading
Loading