Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 86 additions & 5 deletions pkg/cardinal/cardinal.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package cardinal

import (
"bytes"
"context"
"crypto/sha256"
"os/signal"
"syscall"
"time"
Expand Down Expand Up @@ -75,6 +77,11 @@ func NewWorld(opts WorldOptions) (*World, error) {
tel: tel,
}

// Configure disk storage path (disk store created lazily on first disk component registration).
if options.DiskStoragePath != "" {
world.world.SetDiskStoragePath(options.DiskStoragePath)
}

// Set ECS on componet register callback (used for introspect).
world.world.OnComponentRegister(func(zero ecs.Component) error {
return world.debug.register("component", zero)
Expand Down Expand Up @@ -215,13 +222,41 @@ func (w *World) Tick(ctx context.Context, timestamp time.Time) {

w.debug.recordTick(w.currentTick.height, timestamp)

// Flush disk components (writes modified disk components back to files, clears buffer).
// Note: we intentionally do not fsync after flush. The OS page cache holds the data until
// the next compaction (which does fsync). If the process crashes between flushes, the data
// is recovered from the last snapshot, not from the disk file. This avoids the ~1ms fsync
// latency per tick.
if err := w.world.FlushDisk(); err != nil {
w.tel.Logger.Warn().Err(err).Msg("errors encountered flushing disk components")
}

// Emit events.
if err := w.events.Dispatch(); err != nil {
w.tel.Logger.Warn().Err(err).Msg("errors encountered dispatching events")
}

// Compact disk store (remove dead entries from data file).
// Compaction triggers on two conditions:
// 1. CompactionRate reached (regular cleanup, configurable, 0 = disabled)
// 2. Before snapshot (always, to ensure the file blob in the snapshot has no dead data)
isCompactionTick := w.options.CompactionRate > 0 &&
w.currentTick.height > 0 &&
w.currentTick.height%uint64(w.options.CompactionRate) == 0
isSnapshotTick := w.currentTick.height%uint64(w.options.SnapshotRate) == 0

if isCompactionTick || isSnapshotTick {
if err := w.world.CompactDisk(); err != nil {
// Compaction failure is fatal. The compact() path renames a new file over the old
// one and reopens it. If the reopen fails, the diskStore's file handle is permanently
// broken and all subsequent reads/writes will fail. Continuing would silently corrupt
// or lose data. Panicking triggers shutdown, which persists the last good snapshot.
panic(eris.Wrap(err, "fatal: disk compaction failed"))
}
}

// Publish snapshot.
if w.currentTick.height%uint64(w.options.SnapshotRate) == 0 {
if isSnapshotTick {
snapshotCtx, cancel := context.WithTimeout(ctx, 2*time.Second)
w.snapshot(snapshotCtx, timestamp)
cancel()
Expand All @@ -246,11 +281,30 @@ func (w *World) snapshot(ctx context.Context, timestamp time.Time) {
w.tel.Logger.Warn().Err(err).Msg("failed to marshal world state to bytes")
return
}

// Export raw Bitcask file blob for disk components.
// diskColumn[T].toProto() stores keyDirEntries (offsets) in the ECS proto.
// The actual data is this file blob. Both must be restored together.
diskData, err := w.world.ExportDiskFile()
if err != nil {
w.tel.Logger.Warn().Err(err).Msg("failed to export disk file")
return
}

// Compute SHA-256 checksum for integrity verification on restore.
var diskChecksum []byte
if len(diskData) > 0 {
hash := sha256.Sum256(diskData)
diskChecksum = hash[:]
}

snap := &snapshot.Snapshot{
TickHeight: w.currentTick.height,
Timestamp: timestamp,
Data: data,
Version: snapshot.CurrentVersion,
TickHeight: w.currentTick.height,
Timestamp: timestamp,
Data: data,
DiskState: diskData,
DiskStateChecksum: diskChecksum,
Version: snapshot.CurrentVersion,
}
if err := w.snapshotStorage.Store(ctx, snap); err != nil {
w.tel.Logger.Warn().Err(err).Msg("failed to store snapshot")
Expand All @@ -272,7 +326,21 @@ func (w *World) restore(ctx context.Context) error {
return eris.Wrap(err, "failed to load snapshot")
}

// Verify disk state integrity before restoring.
if len(snap.DiskState) > 0 && len(snap.DiskStateChecksum) > 0 {
hash := sha256.Sum256(snap.DiskState)
if !bytes.Equal(hash[:], snap.DiskStateChecksum) {
return eris.New("disk state checksum mismatch: file blob corrupted or from a different snapshot")
}
}

// Restore disk file FIRST so that diskColumn offsets point to valid data.
if err := w.world.ImportDiskFile(snap.DiskState); err != nil {
return eris.Wrap(err, "failed to restore disk file from snapshot")
}

// Unmarshal snapshot bytes into proto and restore ECS world.
// diskColumn[T].fromProto() restores keyDirEntries that point into the file above.
var worldState cardinalv1.WorldState
if err := proto.Unmarshal(snap.Data, &worldState); err != nil {
return eris.Wrap(err, "failed to unmarshal snapshot data")
Expand All @@ -297,10 +365,23 @@ func (w *World) shutdown() {

w.tel.Logger.Info().Msg("Shutting down world")

// Flush and compact disk before final snapshot to ensure clean file blob.
if err := w.world.FlushDisk(); err != nil {
w.tel.Logger.Warn().Err(err).Msg("failed to flush disk on shutdown")
}
if err := w.world.CompactDisk(); err != nil {
w.tel.Logger.Warn().Err(err).Msg("failed to compact disk on shutdown")
}

snapshotCtx, snapshotCancel := context.WithTimeout(ctx, 2*time.Second)
w.snapshot(snapshotCtx, time.Now())
snapshotCancel()

// Close disk store file handle.
if err := w.world.CloseDisk(); err != nil {
w.tel.Logger.Error().Err(err).Msg("disk store close error")
}

// Shutdown debug server.
if err := w.debug.Shutdown(ctx); err != nil {
w.tel.Logger.Error().Err(err).Msg("debug server shutdown error")
Expand Down
31 changes: 31 additions & 0 deletions pkg/cardinal/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,23 @@ type WorldOptions struct {
SnapshotRate uint32 // Number of ticks per snapshot
Debug *bool // Enable debug server
NATSConfig *micro.NATSConfig // Optional NATS config override (nil = use env/defaults)
DiskStoragePath string // Local disk path for disk-backed components (empty = disabled)

// Number of ticks between disk compactions. Compaction rewrites the Bitcask file
// without dead entries (old versions from updates and deletes).
//
// Set to 0 to disable periodic compaction. Compaction still runs before every
// snapshot regardless of this setting, to ensure the snapshot file blob is clean.
//
// Formula for choosing a value based on 50% waste ratio threshold:
// CompactionRate = total_live_entities / updates_per_tick
//
// Example: 10,000 entities, 100 updates per tick -> CompactionRate = 100
// Example: 10,000 entities, 10 updates per tick -> CompactionRate = 1,000
//
// For progression-based systems where data mostly grows and rarely updates,
// set to 0 and rely on pre-snapshot compaction.
CompactionRate uint32
}

// newDefaultWorldOptions creates WorldOptions with default values.
Expand Down Expand Up @@ -67,6 +84,12 @@ func (opt *WorldOptions) apply(newOpt WorldOptions) {
if newOpt.NATSConfig != nil {
opt.NATSConfig = newOpt.NATSConfig
}
if newOpt.DiskStoragePath != "" {
opt.DiskStoragePath = newOpt.DiskStoragePath
}
if newOpt.CompactionRate != 0 {
opt.CompactionRate = newOpt.CompactionRate
}
}

// validate checks that all required options are set and valid.
Expand Down Expand Up @@ -142,6 +165,12 @@ type worldOptionsEnv struct {

// Enable debug server.
Debug bool `env:"CARDINAL_DEBUG" envDefault:"false"`

// Local disk path for disk-backed components (empty = disabled).
DiskStoragePath string `env:"CARDINAL_DISK_STORAGE_PATH"`

// Number of ticks per disk compaction (0 = disabled).
CompactionRate uint32 `env:"CARDINAL_COMPACTION_RATE"`
}

// loadWorldOptionsEnv loads the world options from environment variables.
Expand Down Expand Up @@ -180,5 +209,7 @@ func (cfg *worldOptionsEnv) toOptions() WorldOptions {
SnapshotStorageType: snapshotStorageType,
SnapshotRate: cfg.SnapshotRate,
Debug: &cfg.Debug,
DiskStoragePath: cfg.DiskStoragePath,
CompactionRate: cfg.CompactionRate,
}
}
19 changes: 19 additions & 0 deletions pkg/cardinal/dst.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@
t.Setenv("LOG_LEVEL", "disabled")

debug := false
diskPath := t.TempDir()
w, err := NewWorld(WorldOptions{
Region: "dst",
Organization: "dst",
Expand All @@ -201,6 +202,8 @@
SnapshotStorageType: snapshot.StorageTypeNop,
SnapshotRate: cfg.SnapshotRate,
Debug: &debug,
DiskStoragePath: diskPath,
CompactionRate: 50,
})
require.NoError(t, err)

Expand Down Expand Up @@ -244,7 +247,7 @@
}
}

func (f *dstFixture) logWorldState(t *testing.T, label string) {

Check failure on line 250 in pkg/cardinal/dst.go

View workflow job for this annotation

GitHub Actions / Lint

func (*dstFixture).logWorldState is unused (unused)
t.Helper()
ws, err := f.world.world.ToProto()
if err != nil {
Expand Down Expand Up @@ -382,6 +385,14 @@
cp := *s
cp.Data = make([]byte, len(s.Data))
copy(cp.Data, s.Data)
if s.DiskState != nil {
cp.DiskState = make([]byte, len(s.DiskState))
copy(cp.DiskState, s.DiskState)
}
if s.DiskStateChecksum != nil {
cp.DiskStateChecksum = make([]byte, len(s.DiskStateChecksum))
copy(cp.DiskStateChecksum, s.DiskStateChecksum)
}
m.snap = &cp
return nil
}
Expand All @@ -395,6 +406,14 @@
cp := *m.snap
cp.Data = make([]byte, len(m.snap.Data))
copy(cp.Data, m.snap.Data)
if m.snap.DiskState != nil {
cp.DiskState = make([]byte, len(m.snap.DiskState))
copy(cp.DiskState, m.snap.DiskState)
}
if m.snap.DiskStateChecksum != nil {
cp.DiskStateChecksum = make([]byte, len(m.snap.DiskStateChecksum))
copy(cp.DiskStateChecksum, m.snap.DiskStateChecksum)
}

return &cp, nil
}
3 changes: 3 additions & 0 deletions pkg/cardinal/internal/ecs/archetype.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,9 @@ func (a *archetype) moveEntity(destination *archetype, eid EntityID) {
assert.That(exists, "new entity isn't created in the destination archetype")

// Move entity's components to the new archetype.
// Note: for disk-backed columns (diskColumn[T]), this performs a full disk read +
// msgpack deserialize + re-serialize + buffer. This is expensive. Disk components
// work best on entities with stable archetype composition (no frequent add/remove).
for _, dst := range destination.columns {
for _, src := range a.columns {
if dst.name() == src.name() {
Expand Down
Loading
Loading