-
Notifications
You must be signed in to change notification settings - Fork 0
transaction improvement #7
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: lunar
Are you sure you want to change the base?
Changes from all commits
ce772a5
0a62ef5
dbb3d27
67809f8
3fb5120
edf7866
04aa2d1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,167 @@ | ||
| package world | ||
|
|
||
| import ( | ||
| "sync/atomic" | ||
| "time" | ||
| ) | ||
|
|
||
| const worldMetricsLogInterval = 10 * time.Second | ||
|
|
||
| type worldMetrics struct { | ||
| execEnqueueCount atomic.Uint64 | ||
| execEnqueueNanos atomic.Uint64 | ||
| execEnqueueMaxNanos atomic.Uint64 | ||
| queueLenMax atomic.Int64 | ||
|
|
||
| txCount atomic.Uint64 | ||
| txNanos atomic.Uint64 | ||
| txMaxNanos atomic.Uint64 | ||
|
|
||
| tickCount atomic.Uint64 | ||
| tickNanos atomic.Uint64 | ||
| tickMaxNanos atomic.Uint64 | ||
|
|
||
| tickEntityNanos atomic.Uint64 | ||
| tickScheduledNanos atomic.Uint64 | ||
| tickRandomNanos atomic.Uint64 | ||
| tickNeighbourNanos atomic.Uint64 | ||
|
|
||
| prefetchDropped atomic.Uint64 | ||
| } | ||
|
|
||
| type worldMetricsSnapshot struct { | ||
| execEnqueueCount uint64 | ||
| execEnqueueNanos uint64 | ||
| execEnqueueMaxNanos uint64 | ||
| queueLenMax int64 | ||
|
|
||
| txCount uint64 | ||
| txNanos uint64 | ||
| txMaxNanos uint64 | ||
|
|
||
| tickCount uint64 | ||
| tickNanos uint64 | ||
| tickMaxNanos uint64 | ||
|
|
||
| tickEntityNanos uint64 | ||
| tickScheduledNanos uint64 | ||
| tickRandomNanos uint64 | ||
| tickNeighbourNanos uint64 | ||
|
|
||
| prefetchDropped uint64 | ||
| } | ||
|
|
||
| func (m *worldMetrics) observeExecEnqueue(d time.Duration, queueLen int) { | ||
| nanos := uint64(d) | ||
| m.execEnqueueCount.Add(1) | ||
| m.execEnqueueNanos.Add(nanos) | ||
| updateMaxUint64(&m.execEnqueueMaxNanos, nanos) | ||
| updateMaxInt64(&m.queueLenMax, int64(queueLen)) | ||
| } | ||
|
|
||
| func (m *worldMetrics) observeTx(d time.Duration) { | ||
| nanos := uint64(d) | ||
| m.txCount.Add(1) | ||
| m.txNanos.Add(nanos) | ||
| updateMaxUint64(&m.txMaxNanos, nanos) | ||
| } | ||
|
|
||
| func (m *worldMetrics) observeTick(total, entity, scheduled, random, neighbour time.Duration) { | ||
| m.tickCount.Add(1) | ||
| m.tickNanos.Add(uint64(total)) | ||
| m.tickEntityNanos.Add(uint64(entity)) | ||
| m.tickScheduledNanos.Add(uint64(scheduled)) | ||
| m.tickRandomNanos.Add(uint64(random)) | ||
| m.tickNeighbourNanos.Add(uint64(neighbour)) | ||
| updateMaxUint64(&m.tickMaxNanos, uint64(total)) | ||
| } | ||
|
|
||
| func (m *worldMetrics) observePrefetchDropped() { | ||
| m.prefetchDropped.Add(1) | ||
| } | ||
|
|
||
| func (m *worldMetrics) snapshotAndReset() worldMetricsSnapshot { | ||
| return worldMetricsSnapshot{ | ||
| execEnqueueCount: m.execEnqueueCount.Swap(0), | ||
| execEnqueueNanos: m.execEnqueueNanos.Swap(0), | ||
| execEnqueueMaxNanos: m.execEnqueueMaxNanos.Swap(0), | ||
| queueLenMax: m.queueLenMax.Swap(0), | ||
| txCount: m.txCount.Swap(0), | ||
| txNanos: m.txNanos.Swap(0), | ||
| txMaxNanos: m.txMaxNanos.Swap(0), | ||
| tickCount: m.tickCount.Swap(0), | ||
| tickNanos: m.tickNanos.Swap(0), | ||
| tickMaxNanos: m.tickMaxNanos.Swap(0), | ||
| tickEntityNanos: m.tickEntityNanos.Swap(0), | ||
| tickScheduledNanos: m.tickScheduledNanos.Swap(0), | ||
| tickRandomNanos: m.tickRandomNanos.Swap(0), | ||
| tickNeighbourNanos: m.tickNeighbourNanos.Swap(0), | ||
| prefetchDropped: m.prefetchDropped.Swap(0), | ||
| } | ||
| } | ||
|
|
||
| func (w *World) metricsLoop() { | ||
| defer w.running.Done() | ||
|
|
||
| t := time.NewTicker(worldMetricsLogInterval) | ||
| defer t.Stop() | ||
|
|
||
| for { | ||
| select { | ||
| case <-w.closing: | ||
| return | ||
| case <-t.C: | ||
| s := w.metrics.snapshotAndReset() | ||
| if s.txCount == 0 && s.tickCount == 0 && s.prefetchDropped == 0 { | ||
| continue | ||
| } | ||
| w.conf.Log.Debug( | ||
| "world perf", | ||
| "window", worldMetricsLogInterval.String(), | ||
| "queue_max", s.queueLenMax, | ||
| "enqueue_avg_ms", avgMillis(s.execEnqueueNanos, s.execEnqueueCount), | ||
| "enqueue_max_ms", millisFromNanos(s.execEnqueueMaxNanos), | ||
| "tx_count", s.txCount, | ||
| "tx_avg_ms", avgMillis(s.txNanos, s.txCount), | ||
| "tx_max_ms", millisFromNanos(s.txMaxNanos), | ||
| "tick_count", s.tickCount, | ||
| "tick_avg_ms", avgMillis(s.tickNanos, s.tickCount), | ||
| "tick_max_ms", millisFromNanos(s.tickMaxNanos), | ||
| "tick_entities_avg_ms", avgMillis(s.tickEntityNanos, s.tickCount), | ||
| "tick_scheduled_avg_ms", avgMillis(s.tickScheduledNanos, s.tickCount), | ||
| "tick_random_avg_ms", avgMillis(s.tickRandomNanos, s.tickCount), | ||
| "tick_neighbour_avg_ms", avgMillis(s.tickNeighbourNanos, s.tickCount), | ||
| "prefetch_dropped", s.prefetchDropped, | ||
| ) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| func updateMaxUint64(dst *atomic.Uint64, value uint64) { | ||
| for { | ||
| current := dst.Load() | ||
| if value <= current || dst.CompareAndSwap(current, value) { | ||
| return | ||
| } | ||
| } | ||
| } | ||
|
|
||
| func updateMaxInt64(dst *atomic.Int64, value int64) { | ||
| for { | ||
| current := dst.Load() | ||
| if value <= current || dst.CompareAndSwap(current, value) { | ||
| return | ||
| } | ||
| } | ||
| } | ||
|
|
||
| func avgMillis(total, count uint64) float64 { | ||
| if count == 0 { | ||
| return 0 | ||
| } | ||
| return millisFromNanos(total) / float64(count) | ||
| } | ||
|
|
||
| func millisFromNanos(nanos uint64) float64 { | ||
| return float64(nanos) / float64(time.Millisecond) | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,126 @@ | ||
| package world | ||
|
|
||
| import ( | ||
| "errors" | ||
|
|
||
| "github.com/df-mc/dragonfly/server/world/chunk" | ||
| "github.com/df-mc/goleveldb/leveldb" | ||
| ) | ||
|
|
||
| const prefetchWorkers = 4 | ||
|
|
||
| // prefetchLoop runs worker goroutines that load columns from the provider and precompute per-chunk lighting outside | ||
| // of the world transaction goroutine. The results are then installed using a short transaction. | ||
| func (w *World) prefetchLoop() { | ||
| for range prefetchWorkers { | ||
| go func() { | ||
| defer w.running.Done() | ||
| for { | ||
| select { | ||
| case <-w.closing: | ||
| return | ||
| case pos, ok := <-w.prefetchRequests: | ||
| if !ok { | ||
| return | ||
| } | ||
| w.prefetchOne(pos) | ||
| } | ||
| } | ||
| }() | ||
| } | ||
| } | ||
|
HashimTheArab marked this conversation as resolved.
|
||
|
|
||
| // prefetchOne performs provider IO and per-chunk light filling, then schedules installation in the world. It must | ||
| // not mutate world state directly. | ||
| func (w *World) prefetchOne(pos ChunkPos) { | ||
| // Do the expensive IO/CPU work off the world transaction goroutine. | ||
| c, err := w.conf.Provider.LoadColumn(pos, w.conf.Dim) | ||
| if errors.Is(err, leveldb.ErrNotFound) { | ||
| // The provider doesn't have a chunk saved at this position, so we generate a new one. | ||
| // Generation is not expected to be common for servers using pre-made maps, but we still handle it. | ||
| w.genMu.Lock() | ||
| col := &chunk.Column{Chunk: chunk.New(airRID, w.Range())} | ||
|
Check failure on line 42 in server/world/prefetch.go
|
||
| w.conf.Generator.GenerateChunk(pos, col.Chunk) | ||
| w.genMu.Unlock() | ||
|
|
||
| c, err = col, nil | ||
|
HashimTheArab marked this conversation as resolved.
|
||
| } else if err != nil { | ||
| // Failed reading from provider. We'll still create an empty chunk so that the world can keep running. | ||
| c = &chunk.Column{Chunk: chunk.New(airRID, w.Range())} | ||
|
Check failure on line 49 in server/world/prefetch.go
|
||
| } | ||
|
|
||
| // Fill light for this chunk only. Cross-chunk spreading is done when installing the chunk (transaction-owned). | ||
| chunk.LightArea([]*chunk.Chunk{c.Chunk}, int(pos[0]), int(pos[1])).Fill() | ||
|
|
||
| select { | ||
| case <-w.closing: | ||
| // World is closing: don't enqueue a transaction that might never run. | ||
| w.prefetchMu.Lock() | ||
| delete(w.prefetchInFlight, pos) | ||
| w.prefetchMu.Unlock() | ||
| return | ||
| default: | ||
| _ = w.Exec(func(tx *Tx) { | ||
| w.installPrefetched(pos, c, err) | ||
| }) | ||
| } | ||
| } | ||
|
|
||
| func (w *World) installPrefetched(pos ChunkPos, c *chunk.Column, loadErr error) { | ||
| // Mark no longer in-flight, regardless of whether we end up installing. | ||
| w.prefetchMu.Lock() | ||
| delete(w.prefetchInFlight, pos) | ||
| w.prefetchMu.Unlock() | ||
|
|
||
| if loadErr != nil && !errors.Is(loadErr, leveldb.ErrNotFound) { | ||
| // Keep errored loads out of the chunk cache so later accesses can retry the provider. | ||
| w.conf.Log.Error("load chunk: "+loadErr.Error(), "X", pos[0], "Z", pos[1]) | ||
| return | ||
| } | ||
|
|
||
| if _, ok := w.chunks[pos]; ok { | ||
| // Already loaded by some other path. | ||
| return | ||
| } | ||
|
|
||
| col := w.columnFrom(c, pos) | ||
| w.chunks[pos] = col | ||
| for _, e := range col.Entities { | ||
| w.entities[e] = pos | ||
| e.w = w | ||
| } | ||
|
|
||
| // Spreading light requires neighbouring chunks and must happen with transaction-owned access to chunks. | ||
| w.calculateLight(pos) | ||
| } | ||
|
cursor[bot] marked this conversation as resolved.
|
||
|
|
||
| // requestPrefetch schedules an asynchronous load of the chunk at pos if it isn't already loaded or in-flight. | ||
| // This method is intended to be called from within a world transaction. | ||
| func (w *World) requestPrefetch(pos ChunkPos) { | ||
| if _, ok := w.chunks[pos]; ok { | ||
| return | ||
| } | ||
|
|
||
| w.prefetchMu.Lock() | ||
| if _, ok := w.prefetchInFlight[pos]; ok { | ||
| w.prefetchMu.Unlock() | ||
| return | ||
| } | ||
| w.prefetchInFlight[pos] = struct{}{} | ||
| w.prefetchMu.Unlock() | ||
|
|
||
| select { | ||
| case w.prefetchRequests <- pos: | ||
| case <-w.closing: | ||
| w.prefetchMu.Lock() | ||
| delete(w.prefetchInFlight, pos) | ||
| w.prefetchMu.Unlock() | ||
| default: | ||
| // Keep the transaction path non-blocking under load. If the queue is saturated, clear in-flight so we can | ||
| // retry this position later. | ||
| w.prefetchMu.Lock() | ||
| delete(w.prefetchInFlight, pos) | ||
| w.prefetchMu.Unlock() | ||
| w.metrics.observePrefetchDropped() | ||
| } | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.