diff --git a/pkg/cardinal/cardinal.go b/pkg/cardinal/cardinal.go index f61821d1c..c5a1329c1 100644 --- a/pkg/cardinal/cardinal.go +++ b/pkg/cardinal/cardinal.go @@ -1,7 +1,9 @@ package cardinal import ( + "bytes" "context" + "crypto/sha256" "os/signal" "syscall" "time" @@ -75,6 +77,11 @@ func NewWorld(opts WorldOptions) (*World, error) { tel: tel, } + // Configure disk storage path (disk store created lazily on first disk component registration). + if options.DiskStoragePath != "" { + world.world.SetDiskStoragePath(options.DiskStoragePath) + } + // Set ECS on componet register callback (used for introspect). world.world.OnComponentRegister(func(zero ecs.Component) error { return world.debug.register("component", zero) @@ -215,13 +222,41 @@ func (w *World) Tick(ctx context.Context, timestamp time.Time) { w.debug.recordTick(w.currentTick.height, timestamp) + // Flush disk components (writes modified disk components back to files, clears buffer). + // Note: we intentionally do not fsync after flush. The OS page cache holds the data until + // the next compaction (which does fsync). If the process crashes between flushes, the data + // is recovered from the last snapshot, not from the disk file. This avoids the ~1ms fsync + // latency per tick. + if err := w.world.FlushDisk(); err != nil { + w.tel.Logger.Warn().Err(err).Msg("errors encountered flushing disk components") + } + // Emit events. if err := w.events.Dispatch(); err != nil { w.tel.Logger.Warn().Err(err).Msg("errors encountered dispatching events") } + // Compact disk store (remove dead entries from data file). + // Compaction triggers on two conditions: + // 1. CompactionRate reached (regular cleanup, configurable, 0 = disabled) + // 2. Before snapshot (always, to ensure the file blob in the snapshot has no dead data) + isCompactionTick := w.options.CompactionRate > 0 && + w.currentTick.height > 0 && + w.currentTick.height%uint64(w.options.CompactionRate) == 0 + isSnapshotTick := w.currentTick.height%uint64(w.options.SnapshotRate) == 0 + + if isCompactionTick || isSnapshotTick { + if err := w.world.CompactDisk(); err != nil { + // Compaction failure is fatal. The compact() path renames a new file over the old + // one and reopens it. If the reopen fails, the diskStore's file handle is permanently + // broken and all subsequent reads/writes will fail. Continuing would silently corrupt + // or lose data. Panicking triggers shutdown, which persists the last good snapshot. + panic(eris.Wrap(err, "fatal: disk compaction failed")) + } + } + // Publish snapshot. - if w.currentTick.height%uint64(w.options.SnapshotRate) == 0 { + if isSnapshotTick { snapshotCtx, cancel := context.WithTimeout(ctx, 2*time.Second) w.snapshot(snapshotCtx, timestamp) cancel() @@ -246,11 +281,30 @@ func (w *World) snapshot(ctx context.Context, timestamp time.Time) { w.tel.Logger.Warn().Err(err).Msg("failed to marshal world state to bytes") return } + + // Export raw Bitcask file blob for disk components. + // diskColumn[T].toProto() stores keyDirEntries (offsets) in the ECS proto. + // The actual data is this file blob. Both must be restored together. + diskData, err := w.world.ExportDiskFile() + if err != nil { + w.tel.Logger.Warn().Err(err).Msg("failed to export disk file") + return + } + + // Compute SHA-256 checksum for integrity verification on restore. + var diskChecksum []byte + if len(diskData) > 0 { + hash := sha256.Sum256(diskData) + diskChecksum = hash[:] + } + snap := &snapshot.Snapshot{ - TickHeight: w.currentTick.height, - Timestamp: timestamp, - Data: data, - Version: snapshot.CurrentVersion, + TickHeight: w.currentTick.height, + Timestamp: timestamp, + Data: data, + DiskState: diskData, + DiskStateChecksum: diskChecksum, + Version: snapshot.CurrentVersion, } if err := w.snapshotStorage.Store(ctx, snap); err != nil { w.tel.Logger.Warn().Err(err).Msg("failed to store snapshot") @@ -272,7 +326,21 @@ func (w *World) restore(ctx context.Context) error { return eris.Wrap(err, "failed to load snapshot") } + // Verify disk state integrity before restoring. + if len(snap.DiskState) > 0 && len(snap.DiskStateChecksum) > 0 { + hash := sha256.Sum256(snap.DiskState) + if !bytes.Equal(hash[:], snap.DiskStateChecksum) { + return eris.New("disk state checksum mismatch: file blob corrupted or from a different snapshot") + } + } + + // Restore disk file FIRST so that diskColumn offsets point to valid data. + if err := w.world.ImportDiskFile(snap.DiskState); err != nil { + return eris.Wrap(err, "failed to restore disk file from snapshot") + } + // Unmarshal snapshot bytes into proto and restore ECS world. + // diskColumn[T].fromProto() restores keyDirEntries that point into the file above. var worldState cardinalv1.WorldState if err := proto.Unmarshal(snap.Data, &worldState); err != nil { return eris.Wrap(err, "failed to unmarshal snapshot data") @@ -297,10 +365,23 @@ func (w *World) shutdown() { w.tel.Logger.Info().Msg("Shutting down world") + // Flush and compact disk before final snapshot to ensure clean file blob. + if err := w.world.FlushDisk(); err != nil { + w.tel.Logger.Warn().Err(err).Msg("failed to flush disk on shutdown") + } + if err := w.world.CompactDisk(); err != nil { + w.tel.Logger.Warn().Err(err).Msg("failed to compact disk on shutdown") + } + snapshotCtx, snapshotCancel := context.WithTimeout(ctx, 2*time.Second) w.snapshot(snapshotCtx, time.Now()) snapshotCancel() + // Close disk store file handle. + if err := w.world.CloseDisk(); err != nil { + w.tel.Logger.Error().Err(err).Msg("disk store close error") + } + // Shutdown debug server. if err := w.debug.Shutdown(ctx); err != nil { w.tel.Logger.Error().Err(err).Msg("debug server shutdown error") diff --git a/pkg/cardinal/config.go b/pkg/cardinal/config.go index 935f913f4..0fb2754cd 100644 --- a/pkg/cardinal/config.go +++ b/pkg/cardinal/config.go @@ -20,6 +20,23 @@ type WorldOptions struct { SnapshotRate uint32 // Number of ticks per snapshot Debug *bool // Enable debug server NATSConfig *micro.NATSConfig // Optional NATS config override (nil = use env/defaults) + DiskStoragePath string // Local disk path for disk-backed components (empty = disabled) + + // Number of ticks between disk compactions. Compaction rewrites the Bitcask file + // without dead entries (old versions from updates and deletes). + // + // Set to 0 to disable periodic compaction. Compaction still runs before every + // snapshot regardless of this setting, to ensure the snapshot file blob is clean. + // + // Formula for choosing a value based on 50% waste ratio threshold: + // CompactionRate = total_live_entities / updates_per_tick + // + // Example: 10,000 entities, 100 updates per tick -> CompactionRate = 100 + // Example: 10,000 entities, 10 updates per tick -> CompactionRate = 1,000 + // + // For progression-based systems where data mostly grows and rarely updates, + // set to 0 and rely on pre-snapshot compaction. + CompactionRate uint32 } // newDefaultWorldOptions creates WorldOptions with default values. @@ -67,6 +84,12 @@ func (opt *WorldOptions) apply(newOpt WorldOptions) { if newOpt.NATSConfig != nil { opt.NATSConfig = newOpt.NATSConfig } + if newOpt.DiskStoragePath != "" { + opt.DiskStoragePath = newOpt.DiskStoragePath + } + if newOpt.CompactionRate != 0 { + opt.CompactionRate = newOpt.CompactionRate + } } // validate checks that all required options are set and valid. @@ -142,6 +165,12 @@ type worldOptionsEnv struct { // Enable debug server. Debug bool `env:"CARDINAL_DEBUG" envDefault:"false"` + + // Local disk path for disk-backed components (empty = disabled). + DiskStoragePath string `env:"CARDINAL_DISK_STORAGE_PATH"` + + // Number of ticks per disk compaction (0 = disabled). + CompactionRate uint32 `env:"CARDINAL_COMPACTION_RATE"` } // loadWorldOptionsEnv loads the world options from environment variables. @@ -180,5 +209,7 @@ func (cfg *worldOptionsEnv) toOptions() WorldOptions { SnapshotStorageType: snapshotStorageType, SnapshotRate: cfg.SnapshotRate, Debug: &cfg.Debug, + DiskStoragePath: cfg.DiskStoragePath, + CompactionRate: cfg.CompactionRate, } } diff --git a/pkg/cardinal/dst.go b/pkg/cardinal/dst.go index 96753683e..910bf1611 100644 --- a/pkg/cardinal/dst.go +++ b/pkg/cardinal/dst.go @@ -192,6 +192,7 @@ func newDSTFixture(t *testing.T, cfg dstConfig, setup DSTSetupFunc) *dstFixture t.Setenv("LOG_LEVEL", "disabled") debug := false + diskPath := t.TempDir() w, err := NewWorld(WorldOptions{ Region: "dst", Organization: "dst", @@ -201,6 +202,8 @@ func newDSTFixture(t *testing.T, cfg dstConfig, setup DSTSetupFunc) *dstFixture SnapshotStorageType: snapshot.StorageTypeNop, SnapshotRate: cfg.SnapshotRate, Debug: &debug, + DiskStoragePath: diskPath, + CompactionRate: 50, }) require.NoError(t, err) @@ -382,6 +385,14 @@ func (m *memSnapshotStorage) Store(_ context.Context, s *snapshot.Snapshot) erro cp := *s cp.Data = make([]byte, len(s.Data)) copy(cp.Data, s.Data) + if s.DiskState != nil { + cp.DiskState = make([]byte, len(s.DiskState)) + copy(cp.DiskState, s.DiskState) + } + if s.DiskStateChecksum != nil { + cp.DiskStateChecksum = make([]byte, len(s.DiskStateChecksum)) + copy(cp.DiskStateChecksum, s.DiskStateChecksum) + } m.snap = &cp return nil } @@ -395,6 +406,14 @@ func (m *memSnapshotStorage) Load(_ context.Context) (*snapshot.Snapshot, error) cp := *m.snap cp.Data = make([]byte, len(m.snap.Data)) copy(cp.Data, m.snap.Data) + if m.snap.DiskState != nil { + cp.DiskState = make([]byte, len(m.snap.DiskState)) + copy(cp.DiskState, m.snap.DiskState) + } + if m.snap.DiskStateChecksum != nil { + cp.DiskStateChecksum = make([]byte, len(m.snap.DiskStateChecksum)) + copy(cp.DiskStateChecksum, m.snap.DiskStateChecksum) + } return &cp, nil } diff --git a/pkg/cardinal/internal/ecs/archetype.go b/pkg/cardinal/internal/ecs/archetype.go index 845c0a75a..3dfb3bd64 100644 --- a/pkg/cardinal/internal/ecs/archetype.go +++ b/pkg/cardinal/internal/ecs/archetype.go @@ -137,6 +137,9 @@ func (a *archetype) moveEntity(destination *archetype, eid EntityID) { assert.That(exists, "new entity isn't created in the destination archetype") // Move entity's components to the new archetype. + // Note: for disk-backed columns (diskColumn[T]), this performs a full disk read + + // msgpack deserialize + re-serialize + buffer. This is expensive. Disk components + // work best on entities with stable archetype composition (no frequent add/remove). for _, dst := range destination.columns { for _, src := range a.columns { if dst.name() == src.name() { diff --git a/pkg/cardinal/internal/ecs/column_disk.go b/pkg/cardinal/internal/ecs/column_disk.go new file mode 100644 index 000000000..89e787fb3 --- /dev/null +++ b/pkg/cardinal/internal/ecs/column_disk.go @@ -0,0 +1,247 @@ +package ecs + +import ( + "encoding/binary" + + "github.com/argus-labs/world-engine/pkg/assert" + cardinalv1 "github.com/argus-labs/world-engine/proto/gen/go/worldengine/cardinal/v1" + "github.com/rotisserie/eris" + "github.com/shamaton/msgpack/v3" +) + +// keyDirEntry holds the location of a component's data in the Bitcask file. +// Stored in the diskColumn's slice instead of the actual component data. +type keyDirEntry struct { + offset int64 // byte offset in the data file + size uint32 // byte length of the serialized value +} + +// diskColumn stores component data on disk via a Bitcask file. +// Implements abstractColumn with the same interface as column[T]. +// +// Instead of storing []T in memory, it stores []keyDirEntry (12 bytes per row). +// The actual component data is serialized (MessagePack) and stored in the disk file. +// On Get, the entry's offset is used to ReadAt from the file, then deserialized. +// On Set, the data is buffered for the current tick and flushed at tick end. +type diskColumn[T Component] struct { + compName string + entries []keyDirEntry + + // Reference to the shared disk store (file handle). + store *diskStore + + // Per-tick buffer for modified rows. Maps row index to serialized bytes. + // Cleared after flush. + pendingWrites map[int][]byte +} + +var _ abstractColumn = &diskColumn[Component]{} + +func newDiskColumn[T Component](store *diskStore) *diskColumn[T] { + var zero T + return &diskColumn[T]{ + compName: zero.Name(), + entries: make([]keyDirEntry, 0, columnCapacity), + store: store, + pendingWrites: make(map[int][]byte), + } +} + +func newDiskColumnFactory[T Component](store *diskStore) columnFactory { + return func() abstractColumn { + return newDiskColumn[T](store) + } +} + +func (c *diskColumn[T]) len() int { + return len(c.entries) +} + +func (c *diskColumn[T]) name() string { + return c.compName +} + +// extend adds a new row with an empty entry (no data on disk yet). +func (c *diskColumn[T]) extend() { + c.entries = append(c.entries, keyDirEntry{offset: -1, size: 0}) +} + +// set serializes the component and buffers it for flush. +func (c *diskColumn[T]) set(row int, component T) { + assert.That(row < len(c.entries), "disk column: row out of bounds") + + data, err := msgpack.Marshal(component) + assert.That(err == nil, "disk column: failed to serialize component") + + c.pendingWrites[row] = data +} + +func (c *diskColumn[T]) setAbstract(row int, component Component) { + concrete, ok := component.(T) + assert.That(ok, "disk column: wrong component type") + c.set(row, concrete) +} + +// get returns the component for the given row. +// Checks pending writes first, then reads from the disk file. +// +// Disk I/O errors (file read, msgpack deserialize) are treated as invariant violations, +// same as out-of-bounds checks in column[T]. If the file is corrupted or the fd is closed, +// the game state is unrecoverable and panicking is the correct behavior. +func (c *diskColumn[T]) get(row int) T { + assert.That(row < len(c.entries), "disk column: row out of bounds") + + // Check pending writes first (buffered this tick). + if data, ok := c.pendingWrites[row]; ok { + var val T + err := msgpack.Unmarshal(data, &val) + assert.That(err == nil, "disk column: failed to deserialize from buffer") + return val + } + + // Read from disk file. + entry := c.entries[row] + assert.That(entry.offset >= 0, "disk column: reading unwritten entry") + + value, err := c.store.readRecord(entry) + assert.That(err == nil, "disk column: failed to read from file") + + var val T + err = msgpack.Unmarshal(value, &val) + assert.That(err == nil, "disk column: failed to deserialize from file") + + return val +} + +func (c *diskColumn[T]) getAbstract(row int) Component { + return c.get(row) +} + +// remove swaps the last entry with the row to remove (same as column[T]). +func (c *diskColumn[T]) remove(row int) { + assert.That(row < len(c.entries), "disk column: tried to remove out of bounds") + + lastIndex := len(c.entries) - 1 + + if row != lastIndex { + // Swap entry. + c.entries[row] = c.entries[lastIndex] + // Swap pending write if the last row has one. + if data, ok := c.pendingWrites[lastIndex]; ok { + c.pendingWrites[row] = data + delete(c.pendingWrites, lastIndex) + } else { + delete(c.pendingWrites, row) + } + } else { + delete(c.pendingWrites, row) + } + + c.entries = c.entries[:lastIndex] +} + +// flush writes all pending data to the disk file and updates the entries. +func (c *diskColumn[T]) flush() error { + + for row, data := range c.pendingWrites { + offset, err := c.store.appendRecord(data) + if err != nil { + return eris.Wrapf(err, "disk column: failed to flush row %d", row) + } + c.entries[row] = keyDirEntry{ + offset: offset, + size: uint32(len(data)), + } + } + clear(c.pendingWrites) + return nil +} + +// collectLiveRecords returns compact records for all entries with data on disk. +// Each record includes the row index so applyCompactedOffsets can write back +// the new offsets after compaction. Called at tick boundary (after systems finish). +func (c *diskColumn[T]) collectLiveRecords() ([]compactRecord, error) { + var records []compactRecord + for i := range c.entries { + if c.entries[i].offset < 0 { + continue + } + + value, err := c.store.readRecord(c.entries[i]) + if err != nil { + return nil, eris.Wrapf(err, "failed to read record at row %d during compaction", i) + } + + records = append(records, compactRecord{ + value: value, + row: i, + }) + } + return records, nil +} + +// applyCompactedOffsets writes the new offsets from compacted records back into entries. +// Called after compact() has updated the offset/size fields in each record. +func (c *diskColumn[T]) applyCompactedOffsets(records []compactRecord) { + for _, rec := range records { + c.entries[rec.row] = keyDirEntry{ + offset: rec.offset, + size: rec.size, + } + } +} + +// toProto serializes the disk column's keyDirEntries for snapshot. +// The actual component data lives in the Bitcask file, which is snapshotted separately +// as a raw blob. This method only stores the index (offset + size per row). +// +// Note: the Bitcask file may contain dead data from previous updates/deletes. +// This does not affect correctness on restore because the keyDirEntries point to the +// correct live offsets. Dead data is wasted bytes that the next compaction cleans up. +// For smaller snapshots, trigger compaction before snapshot. +func (c *diskColumn[T]) toProto() (*cardinalv1.Column, error) { + + // Serialize keyDirEntries as the "components" field. + // Each entry is encoded as 12 bytes: offset(8) + size(4). + entryData := make([][]byte, len(c.entries)) + for i, entry := range c.entries { + buf := make([]byte, 12) + binary.LittleEndian.PutUint64(buf[0:8], uint64(entry.offset)) + binary.LittleEndian.PutUint32(buf[8:12], entry.size) + entryData[i] = buf + } + + return &cardinalv1.Column{ + ComponentName: c.compName, + Components: entryData, + }, nil +} + +// fromProto restores the disk column's keyDirEntries from a snapshot. +// The Bitcask file must already be restored before calling this. +// The offsets in the entries point directly into the restored file. +func (c *diskColumn[T]) fromProto(pb *cardinalv1.Column) error { + if pb == nil { + return eris.New("protobuf column is nil") + } + if pb.GetComponentName() != c.compName { + return eris.Errorf("component name mismatch: expected %s, got %s", c.compName, pb.GetComponentName()) + } + + + c.entries = make([]keyDirEntry, len(pb.GetComponents())) + for i, buf := range pb.GetComponents() { + if len(buf) < 12 { + return eris.Errorf("invalid keyDirEntry at index %d: expected 12 bytes, got %d", i, len(buf)) + } + offset := int64(binary.LittleEndian.Uint64(buf[0:8])) + size := binary.LittleEndian.Uint32(buf[8:12]) + + c.entries[i] = keyDirEntry{ + offset: offset, + size: size, + } + } + + return nil +} diff --git a/pkg/cardinal/internal/ecs/component.go b/pkg/cardinal/internal/ecs/component.go index 358603502..4deccaf51 100644 --- a/pkg/cardinal/internal/ecs/component.go +++ b/pkg/cardinal/internal/ecs/component.go @@ -3,12 +3,18 @@ package ecs import ( "math" "regexp" + "strings" "github.com/argus-labs/world-engine/pkg/assert" "github.com/argus-labs/world-engine/pkg/cardinal/internal/schema" "github.com/rotisserie/eris" ) +// diskSuffix is the reserved suffix for disk-backed components. +// Engineers signal disk storage by appending this to their component Name(). +// The '@' character is rejected by normal name validation, making collisions impossible. +const diskSuffix = "@disk" + // Component is the interface that all components must implement. // Components are pure data containers that can be attached to entities. type Component interface { //nolint:iface // may extend later @@ -53,15 +59,19 @@ func newComponentManager() componentManager { var componentNamePattern = regexp.MustCompile(`^[a-zA-Z_][a-zA-Z0-9_]*$`) // validateComponentName validates that a component name follows expr identifier rules. +// Names may optionally end with "@disk" to signal disk-backed storage. The base name +// (before the @) is validated against the standard identifier pattern. // See: https://expr-lang.org/docs/language-definition#variables func validateComponentName(name string) error { if name == "" { return eris.New("component name cannot be empty") } - if !componentNamePattern.MatchString(name) { + baseName, _ := parseComponentName(name) + + if !componentNamePattern.MatchString(baseName) { return eris.Errorf( - "component name '%s' is invalid: must start with a letter or underscore, "+ + "component name '%s' is invalid: base name must start with a letter or underscore, "+ "and contain only letters, digits, and underscores", name, ) @@ -70,6 +80,22 @@ func validateComponentName(name string) error { return nil } +// parseComponentName splits a component name into its base name and whether it's disk-backed. +// "match_history@disk" → ("match_history", true) +// "match_history" → ("match_history", false) +func parseComponentName(name string) (baseName string, isDisk bool) { + if strings.HasSuffix(name, diskSuffix) { + return strings.TrimSuffix(name, diskSuffix), true + } + + // Reject any other use of '@' in the name. + if strings.Contains(name, "@") { + return name, false // will fail regex validation + } + + return name, false +} + // register registers a new component type and returns its ID. // If the component is already registered, no-op. func (cm *componentManager) register(name string, factory columnFactory) (ComponentID, error) { @@ -107,12 +133,34 @@ func (cm *componentManager) getID(name string) (ComponentID, error) { } // RegisterComponent registers a component type with the world. +// Storage mode is determined by the component's Name() return value: +// - "match_history" → in-memory (default) +// - "match_history@disk" → disk-backed (requires DiskStoragePath in WorldOptions) func RegisterComponent[T Component](world *World) (ComponentID, error) { var zero T + name := zero.Name() + _, isDisk := parseComponentName(name) + if world.onComponentRegister != nil { if err := world.onComponentRegister(zero); err != nil { return 0, eris.Wrap(err, "component registered callback failed") } } - return world.state.components.register(zero.Name(), newColumnFactory[T]()) + + if isDisk { + if world.diskStoragePath == "" { + return 0, eris.Errorf("DiskStoragePath must be set in WorldOptions to use disk component %q", name) + } + // Lazy-create disk store on first disk component registration. + if world.disk == nil { + ds, err := newDiskStore(world.diskStoragePath) + if err != nil { + return 0, eris.Wrap(err, "failed to create disk store") + } + world.disk = ds + } + return world.state.components.register(name, newDiskColumnFactory[T](world.disk)) + } + + return world.state.components.register(name, newColumnFactory[T]()) } diff --git a/pkg/cardinal/internal/ecs/disk_store.go b/pkg/cardinal/internal/ecs/disk_store.go new file mode 100644 index 000000000..9c746a6b2 --- /dev/null +++ b/pkg/cardinal/internal/ecs/disk_store.go @@ -0,0 +1,227 @@ +package ecs + +// Bitcask-style append-only data file for disk-backed components. +// +// The diskStore only manages the file. It does not know about entities, components, +// or indexes. The diskColumn[T] handles all indexing and serialization. +// +// References: +// - Bitcask paper: https://riak.com/assets/bitcask-intro.pdf +// - Bitcask architecture: https://arpitbhayani.me/blogs/bitcask/ + +import ( + "encoding/binary" + "os" + "path/filepath" + "sync" + + "github.com/rotisserie/eris" +) + +// record header size: valueSize(4) = 4 bytes +// Key is omitted because each diskColumn manages its own records (key is always the component name). +const recordHeaderSize = 4 + +// diskStore manages the append-only data file for disk-backed components. +type diskStore struct { + dataFile *os.File + writeOffset int64 + mu sync.Mutex +} + +// newDiskStore opens (or creates) the data file. +func newDiskStore(basePath string) (*diskStore, error) { + if err := os.MkdirAll(basePath, 0o755); err != nil { + return nil, eris.Wrap(err, "failed to create disk store directory") + } + + dataPath := filepath.Join(basePath, "components.dat") + f, err := os.OpenFile(dataPath, os.O_RDWR|os.O_CREATE, 0o644) + if err != nil { + return nil, eris.Wrap(err, "failed to open data file") + } + + // Get current file size for write offset. + info, err := f.Stat() + if err != nil { + f.Close() + return nil, eris.Wrap(err, "failed to stat data file") + } + + return &diskStore{ + dataFile: f, + writeOffset: info.Size(), + }, nil +} + +// appendRecord appends a value to the data file and returns the offset. +// Record format: [valueSize: 4 bytes][value bytes] +func (ds *diskStore) appendRecord(value []byte) (int64, error) { + ds.mu.Lock() + defer ds.mu.Unlock() + + recordOffset := ds.writeOffset + + record := make([]byte, recordHeaderSize+len(value)) + binary.LittleEndian.PutUint32(record[0:4], uint32(len(value))) + copy(record[recordHeaderSize:], value) + + if _, err := ds.dataFile.WriteAt(record, ds.writeOffset); err != nil { + return 0, err + } + ds.writeOffset += int64(len(record)) + + return recordOffset, nil +} + +// readRecord reads a value from the data file at the given entry's offset. +// Record format: [valueSize: 4 bytes][value bytes] +// Validates that the on-disk valueSize matches entry.size for corruption detection. +func (ds *diskStore) readRecord(entry keyDirEntry) ([]byte, error) { + ds.mu.Lock() + defer ds.mu.Unlock() + + header := make([]byte, recordHeaderSize) + if _, err := ds.dataFile.ReadAt(header, entry.offset); err != nil { + return nil, err + } + + valueSize := binary.LittleEndian.Uint32(header[0:4]) + + if valueSize > 100*1024*1024 { + return nil, eris.Errorf("record value size %d exceeds maximum", valueSize) + } + + // Validate against the in-memory entry size for corruption detection. + if entry.size > 0 && valueSize != entry.size { + return nil, eris.Errorf("record size mismatch: header says %d, index says %d (possible corruption)", valueSize, entry.size) + } + + dataOffset := entry.offset + int64(recordHeaderSize) + value := make([]byte, valueSize) + if _, err := ds.dataFile.ReadAt(value, dataOffset); err != nil { + return nil, err + } + + return value, nil +} + +// compact rewrites the data file using only the live entries provided. +// Called by World.CompactDisk which collects live entries from all diskColumns. +func (ds *diskStore) compact(liveRecords []compactRecord) error { + ds.mu.Lock() + defer ds.mu.Unlock() + + basePath := filepath.Dir(ds.dataFile.Name()) + compactPath := filepath.Join(basePath, "components.compact.dat") + + compactFile, err := os.OpenFile(compactPath, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0o644) + if err != nil { + return eris.Wrap(err, "failed to create compaction file") + } + + var newOffset int64 + for i := range liveRecords { + rec := &liveRecords[i] + record := make([]byte, recordHeaderSize+len(rec.value)) + binary.LittleEndian.PutUint32(record[0:4], uint32(len(rec.value))) + copy(record[recordHeaderSize:], rec.value) + + if _, err := compactFile.WriteAt(record, newOffset); err != nil { + compactFile.Close() + os.Remove(compactPath) + return eris.Wrap(err, "failed to write compacted record") + } + + // Write new offset/size back into the record so the caller can apply them. + rec.offset = newOffset + rec.size = uint32(len(rec.value)) + + newOffset += int64(len(record)) + } + + if err := compactFile.Sync(); err != nil { + compactFile.Close() + os.Remove(compactPath) + return eris.Wrap(err, "failed to fsync compacted file") + } + compactFile.Close() + + oldPath := ds.dataFile.Name() + if err := os.Rename(compactPath, oldPath); err != nil { + return eris.Wrap(err, "failed to rename compacted file") + } + + ds.dataFile.Close() + f, err := os.OpenFile(oldPath, os.O_RDWR, 0o644) + if err != nil { + return eris.Wrap(err, "failed to reopen compacted file") + } + + ds.dataFile = f + ds.writeOffset = newOffset + return nil +} + +// compactRecord is passed to compact() with the data to write. +// compact() updates offset and size in place after writing. +// The row field identifies which entry in the source column to update. +type compactRecord struct { + value []byte + row int // Row index in the source diskColumn's entries slice. + offset int64 // Updated by compact() with the new offset in the compacted file. + size uint32 // Updated by compact() with the value size. +} + +// readAll reads the entire data file as a byte blob. +func (ds *diskStore) readAll() ([]byte, error) { + ds.mu.Lock() + defer ds.mu.Unlock() + + data := make([]byte, ds.writeOffset) + if ds.writeOffset == 0 { + return data, nil + } + _, err := ds.dataFile.ReadAt(data, 0) + if err != nil { + return nil, eris.Wrap(err, "failed to read data file") + } + return data, nil +} + +// writeAll replaces the data file contents with the given blob. +func (ds *diskStore) writeAll(data []byte) error { + ds.mu.Lock() + defer ds.mu.Unlock() + + if err := ds.dataFile.Truncate(0); err != nil { + return eris.Wrap(err, "failed to truncate data file") + } + if len(data) > 0 { + if _, err := ds.dataFile.WriteAt(data, 0); err != nil { + return eris.Wrap(err, "failed to write data file") + } + } + ds.writeOffset = int64(len(data)) + return nil +} + +// close closes the data file. +func (ds *diskStore) close() error { + if ds.dataFile != nil { + return ds.dataFile.Close() + } + return nil +} + +// reset truncates the data file. Panics if truncation fails, since a failed truncate +// means the filesystem is in a bad state and continuing would mix stale and new data. +func (ds *diskStore) reset() { + ds.mu.Lock() + defer ds.mu.Unlock() + if err := ds.dataFile.Truncate(0); err != nil { + panic("disk store: failed to truncate data file on reset: " + err.Error()) + } + ds.writeOffset = 0 +} + diff --git a/pkg/cardinal/internal/ecs/ecs.go b/pkg/cardinal/internal/ecs/ecs.go index 0ee59ff2b..c3f95b21e 100644 --- a/pkg/cardinal/internal/ecs/ecs.go +++ b/pkg/cardinal/internal/ecs/ecs.go @@ -19,8 +19,10 @@ func CreateWithArchetype(world *World, components bitmap.Bitmap) EntityID { } // Destroy deletes an entity and all its components from the world. Returns true if the entity is -// deleted, false otherwise. +// deleted, false otherwise. Also cleans up any disk component files for the entity. func Destroy(world *World, eid EntityID) bool { + // The archetype's removeEntity handles both in-memory and disk columns. + // Disk column entries are removed via the normal swap-remove path. return world.state.removeEntity(eid) } @@ -30,32 +32,37 @@ func Alive(world *World, eid EntityID) bool { return exists } -// Set sets a component on an entity. If the entity contains the component type, it will update the -// value. If it doesn't, it will add the component. +// Set sets a component on an entity. Routes through the archetype column, +// which is either column[T] (in-memory) or diskColumn[T] (disk-backed). func Set[T Component](world *World, eid EntityID, component T) error { return setComponent(world.state, eid, component) } -// Get gets a component from an entity. -// Returns an error if the entity doesn't exist or doesn't contain the component type. +// Get gets a component from an entity. Routes through the archetype column, +// which is either column[T] (in-memory) or diskColumn[T] (disk-backed). func Get[T Component](world *World, eid EntityID) (T, error) { return getComponent[T](world.state, eid) } // Remove removes a component from an entity. -// Returns an error if the entity or the component to remove doesn't exist. func Remove[T Component](world *World, eid EntityID) error { return removeComponent[T](world.state, eid) } // Has checks if an entity has a specific component type. +// Uses the archetype bitmap for O(1) lookup. No disk read for disk components. // Returns false if either the entity doesn't exist or doesn't have the component. func Has[T Component](world *World, eid EntityID) bool { - _, err := Get[T](world, eid) - if err == nil { - return true + var zero T + cid, err := world.state.components.getID(zero.Name()) + if err != nil { + return false } - return eris.Is(err, ErrComponentNotFound) + aid, exists := world.state.entityArch.get(eid) + if !exists { + return false + } + return world.state.archetypes[aid].components.Contains(cid) } // IterEntities iterates all entities that match the given component bitmap and match mode. diff --git a/pkg/cardinal/internal/ecs/world.go b/pkg/cardinal/internal/ecs/world.go index 5ed4c2feb..8e0173369 100644 --- a/pkg/cardinal/internal/ecs/world.go +++ b/pkg/cardinal/internal/ecs/world.go @@ -17,6 +17,8 @@ type World struct { scheduler [3]systemScheduler // Systems schedulers (PreTick, Update, PostTick) systemEvents systemEventManager // Manages system events onComponentRegister func(Component) error // Callback called when a component is registered + disk *diskStore // Disk-backed component storage (nil if no disk components) + diskStoragePath string // Path for disk storage (from config) } // NewWorld creates a new World instance. @@ -70,9 +72,13 @@ func (w *World) Tick() { // Reset clears the world state back to its initial empty state. // Components remain registered but all entities and archetypes are cleared. +// Disk storage is also cleared to prevent stale data from reused entity IDs. func (w *World) Reset() { w.state.reset() w.initialized = false + if w.disk != nil { + w.disk.reset() + } } // SetOnSystemRun sets a callback invoked after each system execution. @@ -112,6 +118,120 @@ func (w *World) OnComponentRegister(callback func(zero Component) error) { w.onComponentRegister = callback } +// SetDiskStoragePath configures the path for disk-backed components. +// The disk store itself is created lazily when the first disk component is registered. +func (w *World) SetDiskStoragePath(path string) { + w.diskStoragePath = path +} + +// compactableDiskColumn is implemented by diskColumn[T] to support compaction. +type compactableDiskColumn interface { + collectLiveRecords() ([]compactRecord, error) + applyCompactedOffsets(records []compactRecord) +} + +// columnRecords tracks a column and its record range in the allRecords slice. +type columnRecords struct { + col compactableDiskColumn + start, end int +} + +// CompactDisk rewrites the data file with only live entries from all disk columns. +func (w *World) CompactDisk() error { + if w.disk == nil { + return nil + } + + // Collect all live records from all disk columns, tracking per-column ranges. + var allRecords []compactRecord + var columns []columnRecords + for _, arch := range w.state.archetypes { + for _, col := range arch.columns { + if dc, ok := col.(compactableDiskColumn); ok { + start := len(allRecords) + records, err := dc.collectLiveRecords() + if err != nil { + return err + } + allRecords = append(allRecords, records...) + columns = append(columns, columnRecords{col: dc, start: start, end: len(allRecords)}) + } + } + } + + // Rewrite the file. compact() updates offset/size in each record. + if err := w.disk.compact(allRecords); err != nil { + return err + } + + // Apply the new offsets back to each column's entries. + for _, cr := range columns { + cr.col.applyCompactedOffsets(allRecords[cr.start:cr.end]) + } + return nil +} + +// CloseDisk closes the disk store's data file. +func (w *World) CloseDisk() error { + if w.disk == nil { + return nil + } + return w.disk.close() +} + +// DiskStore returns the disk store, or nil if disk storage is not enabled. +func (w *World) DiskStore() *diskStore { + return w.disk +} + +// flushableDiskColumn is implemented by diskColumn[T] to support flushing pending writes. +type flushableDiskColumn interface { + flush() error +} + +// FlushDisk writes all modified disk components back to disk and clears pending buffers. +// Called after all systems finish in a tick. +func (w *World) FlushDisk() error { + if w.disk == nil { + return nil + } + // Walk all archetypes and flush any disk columns. + for _, arch := range w.state.archetypes { + for _, col := range arch.columns { + if dc, ok := col.(flushableDiskColumn); ok { + if err := dc.flush(); err != nil { + return err + } + } + } + } + return nil +} + + +// ExportDiskFile returns the raw Bitcask file bytes for inclusion in the snapshot. +// Must flush before calling. Returns nil if disk storage is not enabled. +func (w *World) ExportDiskFile() ([]byte, error) { + if w.disk == nil { + return nil, nil + } + return w.disk.readAll() +} + +// ImportDiskFile writes the raw Bitcask file bytes from a snapshot. +// Must be called BEFORE FromProto so that diskColumn offsets point to valid data. +func (w *World) ImportDiskFile(data []byte) error { + if w.disk == nil { + return nil + } + // nil/empty data means snapshot had no disk state; reset the file to avoid stale offsets. + if len(data) == 0 { + w.disk.reset() + return nil + } + return w.disk.writeAll(data) +} + // ------------------------------------------------------------------------------------------------- // Serialization methods // ------------------------------------------------------------------------------------------------- diff --git a/pkg/cardinal/internal/ecs/world_state.go b/pkg/cardinal/internal/ecs/world_state.go index 4d1ee857e..b607fb363 100644 --- a/pkg/cardinal/internal/ecs/world_state.go +++ b/pkg/cardinal/internal/ecs/world_state.go @@ -200,12 +200,17 @@ func setComponent[T Component](ws *worldState, eid EntityID, component T) error // Get the column from the archetype directly. index := archetype.components.CountTo(cid) - column, ok := archetype.columns[index].(*column[T]) - assert.That(ok, "unexpected column type") - row, exists := archetype.rows.get(eid) assert.That(exists, "entity should have a row in its archetype") - column.set(row, component) + + switch col := archetype.columns[index].(type) { + case *column[T]: + col.set(row, component) + case *diskColumn[T]: + col.set(row, component) + default: + assert.That(false, "unexpected column type") + } return nil } @@ -231,12 +236,18 @@ func getComponent[T Component](ws *worldState, eid EntityID) (T, error) { // Get the column from the archetype directly. index := archetype.components.CountTo(cid) - column, ok := archetype.columns[index].(*column[T]) - assert.That(ok, "unexpected column type") - row, exists := archetype.rows.get(eid) assert.That(exists, "entity should have a row in its archetype") - return column.get(row), nil + + switch col := archetype.columns[index].(type) { + case *column[T]: + return col.get(row), nil + case *diskColumn[T]: + return col.get(row), nil + default: + assert.That(false, "unexpected column type") + return zero, eris.New("unexpected column type") + } } // removeComponent removes a component from the given entity. Returns an error if the entity or the diff --git a/pkg/cardinal/snapshot/snapshot.go b/pkg/cardinal/snapshot/snapshot.go index 0f4cd4464..48e08fb81 100644 --- a/pkg/cardinal/snapshot/snapshot.go +++ b/pkg/cardinal/snapshot/snapshot.go @@ -14,8 +14,10 @@ import ( type Snapshot struct { TickHeight uint64 Timestamp time.Time - Data []byte - Version uint32 + Data []byte // ECS world state (protobuf-encoded WorldState) + DiskState []byte // Disk component file bytes (nil if disk storage not used) + DiskStateChecksum []byte // SHA-256 of DiskState for integrity verification + Version uint32 } const CurrentVersion uint32 = 1 diff --git a/pkg/cardinal/snapshot/storage_jetstream.go b/pkg/cardinal/snapshot/storage_jetstream.go index 7a2346d37..1ae64105c 100644 --- a/pkg/cardinal/snapshot/storage_jetstream.go +++ b/pkg/cardinal/snapshot/storage_jetstream.go @@ -90,10 +90,12 @@ func (j *JetStreamStorage) Store(ctx context.Context, snapshot *Snapshot) error return eris.Wrap(err, "failed to unmarshal world state") } snapshotPb := &cardinalv1.Snapshot{ - TickHeight: snapshot.TickHeight, - Timestamp: timestamppb.New(snapshot.Timestamp), - WorldState: &worldState, - Version: snapshot.Version, + TickHeight: snapshot.TickHeight, + Timestamp: timestamppb.New(snapshot.Timestamp), + WorldState: &worldState, + Version: snapshot.Version, + DiskState: snapshot.DiskState, + DiskStateChecksum: snapshot.DiskStateChecksum, } data, err := proto.Marshal(snapshotPb) if err != nil { @@ -139,10 +141,12 @@ func (j *JetStreamStorage) Load(ctx context.Context) (*Snapshot, error) { } return &Snapshot{ - TickHeight: snapshotPb.GetTickHeight(), - Timestamp: snapshotPb.GetTimestamp().AsTime(), - Data: worldStateBytes, - Version: snapshotPb.GetVersion(), + TickHeight: snapshotPb.GetTickHeight(), + Timestamp: snapshotPb.GetTimestamp().AsTime(), + Data: worldStateBytes, + DiskState: snapshotPb.GetDiskState(), + DiskStateChecksum: snapshotPb.GetDiskStateChecksum(), + Version: snapshotPb.GetVersion(), }, nil } diff --git a/pkg/cardinal/snapshot/storage_s3.go b/pkg/cardinal/snapshot/storage_s3.go index c2086d0b0..412756767 100644 --- a/pkg/cardinal/snapshot/storage_s3.go +++ b/pkg/cardinal/snapshot/storage_s3.go @@ -106,10 +106,12 @@ func (s *S3Storage) Store(ctx context.Context, snapshot *Snapshot) error { return eris.Wrap(err, "failed to unmarshal world state") } snapshotPb := &cardinalv1.Snapshot{ - TickHeight: snapshot.TickHeight, - Timestamp: timestamppb.New(snapshot.Timestamp), - WorldState: &worldState, - Version: snapshot.Version, + TickHeight: snapshot.TickHeight, + Timestamp: timestamppb.New(snapshot.Timestamp), + WorldState: &worldState, + Version: snapshot.Version, + DiskState: snapshot.DiskState, + DiskStateChecksum: snapshot.DiskStateChecksum, } data, err := proto.Marshal(snapshotPb) if err != nil { @@ -170,10 +172,12 @@ func (s *S3Storage) Load(ctx context.Context) (*Snapshot, error) { } return &Snapshot{ - TickHeight: snapshotPb.GetTickHeight(), - Timestamp: snapshotPb.GetTimestamp().AsTime(), - Data: worldStateBytes, - Version: snapshotPb.GetVersion(), + TickHeight: snapshotPb.GetTickHeight(), + Timestamp: snapshotPb.GetTimestamp().AsTime(), + Data: worldStateBytes, + DiskState: snapshotPb.GetDiskState(), + DiskStateChecksum: snapshotPb.GetDiskStateChecksum(), + Version: snapshotPb.GetVersion(), }, nil } diff --git a/pkg/cardinal/system.go b/pkg/cardinal/system.go index 0aaf865f7..686522241 100644 --- a/pkg/cardinal/system.go +++ b/pkg/cardinal/system.go @@ -163,6 +163,12 @@ func (b *BaseSystemState) init(meta *systemInitMetadata) error { // TODO: pass init args (similar to boot info) to get system name in logger. // Logger returns the logger for the world. +// World returns the cardinal World. Used for disk component operations +// (CreateEntity, SetDiskComponent, GetDiskComponent). +func (b *BaseSystemState) World() *World { + return b.world +} + func (b *BaseSystemState) Logger() *zerolog.Logger { logger := b.world.tel.GetLogger("system") return &logger diff --git a/proto/gen/csharp/WorldEngine/Proto/Cardinal/V1/Snapshot.cs b/proto/gen/csharp/WorldEngine/Proto/Cardinal/V1/Snapshot.cs index 0ff834b0d..4ab7fa3b9 100644 --- a/proto/gen/csharp/WorldEngine/Proto/Cardinal/V1/Snapshot.cs +++ b/proto/gen/csharp/WorldEngine/Proto/Cardinal/V1/Snapshot.cs @@ -26,28 +26,30 @@ static SnapshotReflection() { string.Concat( "CiZ3b3JsZGVuZ2luZS9jYXJkaW5hbC92MS9zbmFwc2hvdC5wcm90bxIXd29y", "bGRlbmdpbmUuY2FyZGluYWwudjEaG2J1Zi92YWxpZGF0ZS92YWxpZGF0ZS5w", - "cm90bxofZ29vZ2xlL3Byb3RvYnVmL3RpbWVzdGFtcC5wcm90byLFAQoIU25h", + "cm90bxofZ29vZ2xlL3Byb3RvYnVmL3RpbWVzdGFtcC5wcm90byKUAgoIU25h", "cHNob3QSHwoLdGlja19oZWlnaHQYASABKARSCnRpY2tIZWlnaHQSOAoJdGlt", "ZXN0YW1wGAIgASgLMhouZ29vZ2xlLnByb3RvYnVmLlRpbWVzdGFtcFIJdGlt", "ZXN0YW1wEkQKC3dvcmxkX3N0YXRlGAMgASgLMiMud29ybGRlbmdpbmUuY2Fy", "ZGluYWwudjEuV29ybGRTdGF0ZVIKd29ybGRTdGF0ZRIYCgd2ZXJzaW9uGAQg", - "ASgNUgd2ZXJzaW9uIqUBCgpXb3JsZFN0YXRlEhcKB25leHRfaWQYASABKA1S", - "Bm5leHRJZBIZCghmcmVlX2lkcxgCIAMoDVIHZnJlZUlkcxIfCgtlbnRpdHlf", - "YXJjaBgDIAMoA1IKZW50aXR5QXJjaBJCCgphcmNoZXR5cGVzGAQgAygLMiIu", - "d29ybGRlbmdpbmUuY2FyZGluYWwudjEuQXJjaGV0eXBlUgphcmNoZXR5cGVz", - "IrMBCglBcmNoZXR5cGUSDgoCaWQYASABKAVSAmlkEisKEWNvbXBvbmVudHNf", - "Yml0bWFwGAIgASgMUhBjb21wb25lbnRzQml0bWFwEhIKBHJvd3MYAyADKANS", - "BHJvd3MSGgoIZW50aXRpZXMYBCADKA1SCGVudGl0aWVzEjkKB2NvbHVtbnMY", - "BSADKAsyHy53b3JsZGVuZ2luZS5jYXJkaW5hbC52MS5Db2x1bW5SB2NvbHVt", - "bnMiWAoGQ29sdW1uEi4KDmNvbXBvbmVudF9uYW1lGAEgASgJQge6SARyAhAB", - "Ug1jb21wb25lbnROYW1lEh4KCmNvbXBvbmVudHMYAiADKAxSCmNvbXBvbmVu", - "dHNCdFpSZ2l0aHViLmNvbS9hcmd1cy1sYWJzL3dvcmxkLWVuZ2luZS9wcm90", - "by9nZW4vZ28vd29ybGRlbmdpbmUvY2FyZGluYWwvdjE7Y2FyZGluYWx2MaoC", - "HVdvcmxkRW5naW5lLlByb3RvLkNhcmRpbmFsLlYxYgZwcm90bzM=")); + "ASgNUgd2ZXJzaW9uEh0KCmRpc2tfc3RhdGUYBSABKAxSCWRpc2tTdGF0ZRIu", + "ChNkaXNrX3N0YXRlX2NoZWNrc3VtGAYgASgMUhFkaXNrU3RhdGVDaGVja3N1", + "bSKlAQoKV29ybGRTdGF0ZRIXCgduZXh0X2lkGAEgASgNUgZuZXh0SWQSGQoI", + "ZnJlZV9pZHMYAiADKA1SB2ZyZWVJZHMSHwoLZW50aXR5X2FyY2gYAyADKANS", + "CmVudGl0eUFyY2gSQgoKYXJjaGV0eXBlcxgEIAMoCzIiLndvcmxkZW5naW5l", + "LmNhcmRpbmFsLnYxLkFyY2hldHlwZVIKYXJjaGV0eXBlcyKzAQoJQXJjaGV0", + "eXBlEg4KAmlkGAEgASgFUgJpZBIrChFjb21wb25lbnRzX2JpdG1hcBgCIAEo", + "DFIQY29tcG9uZW50c0JpdG1hcBISCgRyb3dzGAMgAygDUgRyb3dzEhoKCGVu", + "dGl0aWVzGAQgAygNUghlbnRpdGllcxI5Cgdjb2x1bW5zGAUgAygLMh8ud29y", + "bGRlbmdpbmUuY2FyZGluYWwudjEuQ29sdW1uUgdjb2x1bW5zIlgKBkNvbHVt", + "bhIuCg5jb21wb25lbnRfbmFtZRgBIAEoCUIHukgEcgIQAVINY29tcG9uZW50", + "TmFtZRIeCgpjb21wb25lbnRzGAIgAygMUgpjb21wb25lbnRzQnRaUmdpdGh1", + "Yi5jb20vYXJndXMtbGFicy93b3JsZC1lbmdpbmUvcHJvdG8vZ2VuL2dvL3dv", + "cmxkZW5naW5lL2NhcmRpbmFsL3YxO2NhcmRpbmFsdjGqAh1Xb3JsZEVuZ2lu", + "ZS5Qcm90by5DYXJkaW5hbC5WMWIGcHJvdG8z")); descriptor = pbr::FileDescriptor.FromGeneratedCode(descriptorData, new pbr::FileDescriptor[] { global::Buf.Validate.ValidateReflection.Descriptor, global::Google.Protobuf.WellKnownTypes.TimestampReflection.Descriptor, }, new pbr::GeneratedClrTypeInfo(null, null, new pbr::GeneratedClrTypeInfo[] { - new pbr::GeneratedClrTypeInfo(typeof(global::WorldEngine.Proto.Cardinal.V1.Snapshot), global::WorldEngine.Proto.Cardinal.V1.Snapshot.Parser, new[]{ "TickHeight", "Timestamp", "WorldState", "Version" }, null, null, null, null), + new pbr::GeneratedClrTypeInfo(typeof(global::WorldEngine.Proto.Cardinal.V1.Snapshot), global::WorldEngine.Proto.Cardinal.V1.Snapshot.Parser, new[]{ "TickHeight", "Timestamp", "WorldState", "Version", "DiskState", "DiskStateChecksum" }, null, null, null, null), new pbr::GeneratedClrTypeInfo(typeof(global::WorldEngine.Proto.Cardinal.V1.WorldState), global::WorldEngine.Proto.Cardinal.V1.WorldState.Parser, new[]{ "NextId", "FreeIds", "EntityArch", "Archetypes" }, null, null, null, null), new pbr::GeneratedClrTypeInfo(typeof(global::WorldEngine.Proto.Cardinal.V1.Archetype), global::WorldEngine.Proto.Cardinal.V1.Archetype.Parser, new[]{ "Id", "ComponentsBitmap", "Rows", "Entities", "Columns" }, null, null, null, null), new pbr::GeneratedClrTypeInfo(typeof(global::WorldEngine.Proto.Cardinal.V1.Column), global::WorldEngine.Proto.Cardinal.V1.Column.Parser, new[]{ "ComponentName", "Components" }, null, null, null, null) @@ -99,6 +101,8 @@ public Snapshot(Snapshot other) : this() { timestamp_ = other.timestamp_ != null ? other.timestamp_.Clone() : null; worldState_ = other.worldState_ != null ? other.worldState_.Clone() : null; version_ = other.version_; + diskState_ = other.diskState_; + diskStateChecksum_ = other.diskStateChecksum_; _unknownFields = pb::UnknownFieldSet.Clone(other._unknownFields); } @@ -156,6 +160,36 @@ public uint Version { } } + /// Field number for the "disk_state" field. + public const int DiskStateFieldNumber = 5; + private pb::ByteString diskState_ = pb::ByteString.Empty; + /// + /// Disk component file state. Raw Bitcask file bytes. Empty if disk storage not used. + /// + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + [global::System.CodeDom.Compiler.GeneratedCode("protoc", null)] + public pb::ByteString DiskState { + get { return diskState_; } + set { + diskState_ = pb::ProtoPreconditions.CheckNotNull(value, "value"); + } + } + + /// Field number for the "disk_state_checksum" field. + public const int DiskStateChecksumFieldNumber = 6; + private pb::ByteString diskStateChecksum_ = pb::ByteString.Empty; + /// + /// SHA-256 checksum of disk_state for integrity verification on restore. + /// + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] + [global::System.CodeDom.Compiler.GeneratedCode("protoc", null)] + public pb::ByteString DiskStateChecksum { + get { return diskStateChecksum_; } + set { + diskStateChecksum_ = pb::ProtoPreconditions.CheckNotNull(value, "value"); + } + } + [global::System.Diagnostics.DebuggerNonUserCodeAttribute] [global::System.CodeDom.Compiler.GeneratedCode("protoc", null)] public override bool Equals(object other) { @@ -175,6 +209,8 @@ public bool Equals(Snapshot other) { if (!object.Equals(Timestamp, other.Timestamp)) return false; if (!object.Equals(WorldState, other.WorldState)) return false; if (Version != other.Version) return false; + if (DiskState != other.DiskState) return false; + if (DiskStateChecksum != other.DiskStateChecksum) return false; return Equals(_unknownFields, other._unknownFields); } @@ -186,6 +222,8 @@ public override int GetHashCode() { if (timestamp_ != null) hash ^= Timestamp.GetHashCode(); if (worldState_ != null) hash ^= WorldState.GetHashCode(); if (Version != 0) hash ^= Version.GetHashCode(); + if (DiskState.Length != 0) hash ^= DiskState.GetHashCode(); + if (DiskStateChecksum.Length != 0) hash ^= DiskStateChecksum.GetHashCode(); if (_unknownFields != null) { hash ^= _unknownFields.GetHashCode(); } @@ -220,6 +258,14 @@ public void WriteTo(pb::CodedOutputStream output) { output.WriteRawTag(32); output.WriteUInt32(Version); } + if (DiskState.Length != 0) { + output.WriteRawTag(42); + output.WriteBytes(DiskState); + } + if (DiskStateChecksum.Length != 0) { + output.WriteRawTag(50); + output.WriteBytes(DiskStateChecksum); + } if (_unknownFields != null) { _unknownFields.WriteTo(output); } @@ -246,6 +292,14 @@ public void WriteTo(pb::CodedOutputStream output) { output.WriteRawTag(32); output.WriteUInt32(Version); } + if (DiskState.Length != 0) { + output.WriteRawTag(42); + output.WriteBytes(DiskState); + } + if (DiskStateChecksum.Length != 0) { + output.WriteRawTag(50); + output.WriteBytes(DiskStateChecksum); + } if (_unknownFields != null) { _unknownFields.WriteTo(ref output); } @@ -268,6 +322,12 @@ public int CalculateSize() { if (Version != 0) { size += 1 + pb::CodedOutputStream.ComputeUInt32Size(Version); } + if (DiskState.Length != 0) { + size += 1 + pb::CodedOutputStream.ComputeBytesSize(DiskState); + } + if (DiskStateChecksum.Length != 0) { + size += 1 + pb::CodedOutputStream.ComputeBytesSize(DiskStateChecksum); + } if (_unknownFields != null) { size += _unknownFields.CalculateSize(); } @@ -298,6 +358,12 @@ public void MergeFrom(Snapshot other) { if (other.Version != 0) { Version = other.Version; } + if (other.DiskState.Length != 0) { + DiskState = other.DiskState; + } + if (other.DiskStateChecksum.Length != 0) { + DiskStateChecksum = other.DiskStateChecksum; + } _unknownFields = pb::UnknownFieldSet.MergeFrom(_unknownFields, other._unknownFields); } @@ -339,6 +405,14 @@ public void MergeFrom(pb::CodedInputStream input) { Version = input.ReadUInt32(); break; } + case 42: { + DiskState = input.ReadBytes(); + break; + } + case 50: { + DiskStateChecksum = input.ReadBytes(); + break; + } } } #endif @@ -380,6 +454,14 @@ public void MergeFrom(pb::CodedInputStream input) { Version = input.ReadUInt32(); break; } + case 42: { + DiskState = input.ReadBytes(); + break; + } + case 50: { + DiskStateChecksum = input.ReadBytes(); + break; + } } } } diff --git a/proto/gen/go/worldengine/cardinal/v1/debug.pb.go b/proto/gen/go/worldengine/cardinal/v1/debug.pb.go index 99acbc8d5..a6adcb2f2 100644 --- a/proto/gen/go/worldengine/cardinal/v1/debug.pb.go +++ b/proto/gen/go/worldengine/cardinal/v1/debug.pb.go @@ -1,6 +1,6 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.36.11 +// protoc-gen-go v1.36.10 // protoc (unknown) // source: worldengine/cardinal/v1/debug.proto diff --git a/proto/gen/go/worldengine/cardinal/v1/snapshot.pb.go b/proto/gen/go/worldengine/cardinal/v1/snapshot.pb.go index f5e96218d..e096aa4b9 100644 --- a/proto/gen/go/worldengine/cardinal/v1/snapshot.pb.go +++ b/proto/gen/go/worldengine/cardinal/v1/snapshot.pb.go @@ -1,6 +1,6 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.36.11 +// protoc-gen-go v1.36.10 // protoc (unknown) // source: worldengine/cardinal/v1/snapshot.proto @@ -25,13 +25,17 @@ const ( // Snapshot represents a point-in-time capture of shard state. type Snapshot struct { - state protoimpl.MessageState `protogen:"open.v1"` - TickHeight uint64 `protobuf:"varint,1,opt,name=tick_height,json=tickHeight,proto3" json:"tick_height,omitempty"` - Timestamp *timestamppb.Timestamp `protobuf:"bytes,2,opt,name=timestamp,proto3" json:"timestamp,omitempty"` - WorldState *WorldState `protobuf:"bytes,3,opt,name=world_state,json=worldState,proto3" json:"world_state,omitempty"` - Version uint32 `protobuf:"varint,4,opt,name=version,proto3" json:"version,omitempty"` - unknownFields protoimpl.UnknownFields - sizeCache protoimpl.SizeCache + state protoimpl.MessageState `protogen:"open.v1"` + TickHeight uint64 `protobuf:"varint,1,opt,name=tick_height,json=tickHeight,proto3" json:"tick_height,omitempty"` + Timestamp *timestamppb.Timestamp `protobuf:"bytes,2,opt,name=timestamp,proto3" json:"timestamp,omitempty"` + WorldState *WorldState `protobuf:"bytes,3,opt,name=world_state,json=worldState,proto3" json:"world_state,omitempty"` + Version uint32 `protobuf:"varint,4,opt,name=version,proto3" json:"version,omitempty"` + // Disk component file state. Raw Bitcask file bytes. Empty if disk storage not used. + DiskState []byte `protobuf:"bytes,5,opt,name=disk_state,json=diskState,proto3" json:"disk_state,omitempty"` + // SHA-256 checksum of disk_state for integrity verification on restore. + DiskStateChecksum []byte `protobuf:"bytes,6,opt,name=disk_state_checksum,json=diskStateChecksum,proto3" json:"disk_state_checksum,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache } func (x *Snapshot) Reset() { @@ -92,6 +96,20 @@ func (x *Snapshot) GetVersion() uint32 { return 0 } +func (x *Snapshot) GetDiskState() []byte { + if x != nil { + return x.DiskState + } + return nil +} + +func (x *Snapshot) GetDiskStateChecksum() []byte { + if x != nil { + return x.DiskStateChecksum + } + return nil +} + // WorldState represents the ECS world state. type WorldState struct { state protoimpl.MessageState `protogen:"open.v1"` @@ -305,14 +323,17 @@ var File_worldengine_cardinal_v1_snapshot_proto protoreflect.FileDescriptor const file_worldengine_cardinal_v1_snapshot_proto_rawDesc = "" + "\n" + - "&worldengine/cardinal/v1/snapshot.proto\x12\x17worldengine.cardinal.v1\x1a\x1bbuf/validate/validate.proto\x1a\x1fgoogle/protobuf/timestamp.proto\"\xc5\x01\n" + + "&worldengine/cardinal/v1/snapshot.proto\x12\x17worldengine.cardinal.v1\x1a\x1bbuf/validate/validate.proto\x1a\x1fgoogle/protobuf/timestamp.proto\"\x94\x02\n" + "\bSnapshot\x12\x1f\n" + "\vtick_height\x18\x01 \x01(\x04R\n" + "tickHeight\x128\n" + "\ttimestamp\x18\x02 \x01(\v2\x1a.google.protobuf.TimestampR\ttimestamp\x12D\n" + "\vworld_state\x18\x03 \x01(\v2#.worldengine.cardinal.v1.WorldStateR\n" + "worldState\x12\x18\n" + - "\aversion\x18\x04 \x01(\rR\aversion\"\xa5\x01\n" + + "\aversion\x18\x04 \x01(\rR\aversion\x12\x1d\n" + + "\n" + + "disk_state\x18\x05 \x01(\fR\tdiskState\x12.\n" + + "\x13disk_state_checksum\x18\x06 \x01(\fR\x11diskStateChecksum\"\xa5\x01\n" + "\n" + "WorldState\x12\x17\n" + "\anext_id\x18\x01 \x01(\rR\x06nextId\x12\x19\n" + diff --git a/proto/gen/go/worldengine/isc/v1/command.pb.go b/proto/gen/go/worldengine/isc/v1/command.pb.go index f5d7c23e8..3b138bb9d 100644 --- a/proto/gen/go/worldengine/isc/v1/command.pb.go +++ b/proto/gen/go/worldengine/isc/v1/command.pb.go @@ -1,6 +1,6 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.36.11 +// protoc-gen-go v1.36.10 // protoc (unknown) // source: worldengine/isc/v1/command.proto diff --git a/proto/gen/go/worldengine/isc/v1/epoch.pb.go b/proto/gen/go/worldengine/isc/v1/epoch.pb.go index 96a6b9d6f..537919852 100644 --- a/proto/gen/go/worldengine/isc/v1/epoch.pb.go +++ b/proto/gen/go/worldengine/isc/v1/epoch.pb.go @@ -1,6 +1,6 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.36.11 +// protoc-gen-go v1.36.10 // protoc (unknown) // source: worldengine/isc/v1/epoch.proto diff --git a/proto/gen/go/worldengine/isc/v1/event.pb.go b/proto/gen/go/worldengine/isc/v1/event.pb.go index 52d78a7f8..5abb83e1b 100644 --- a/proto/gen/go/worldengine/isc/v1/event.pb.go +++ b/proto/gen/go/worldengine/isc/v1/event.pb.go @@ -1,6 +1,6 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.36.11 +// protoc-gen-go v1.36.10 // protoc (unknown) // source: worldengine/isc/v1/event.proto diff --git a/proto/gen/go/worldengine/isc/v1/persona.pb.go b/proto/gen/go/worldengine/isc/v1/persona.pb.go index 7e78b025b..4a5d2e79a 100644 --- a/proto/gen/go/worldengine/isc/v1/persona.pb.go +++ b/proto/gen/go/worldengine/isc/v1/persona.pb.go @@ -1,6 +1,6 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.36.11 +// protoc-gen-go v1.36.10 // protoc (unknown) // source: worldengine/isc/v1/persona.proto diff --git a/proto/gen/go/worldengine/isc/v1/query.pb.go b/proto/gen/go/worldengine/isc/v1/query.pb.go index 570852ef8..ed4fc6a49 100644 --- a/proto/gen/go/worldengine/isc/v1/query.pb.go +++ b/proto/gen/go/worldengine/isc/v1/query.pb.go @@ -1,6 +1,6 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.36.11 +// protoc-gen-go v1.36.10 // protoc (unknown) // source: worldengine/isc/v1/query.proto diff --git a/proto/gen/go/worldengine/micro/v1/response.pb.go b/proto/gen/go/worldengine/micro/v1/response.pb.go index 927c5e0c3..2ac8eed73 100644 --- a/proto/gen/go/worldengine/micro/v1/response.pb.go +++ b/proto/gen/go/worldengine/micro/v1/response.pb.go @@ -1,6 +1,6 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.36.11 +// protoc-gen-go v1.36.10 // protoc (unknown) // source: worldengine/micro/v1/response.proto diff --git a/proto/gen/go/worldengine/micro/v1/service.pb.go b/proto/gen/go/worldengine/micro/v1/service.pb.go index 4e7e7e616..296fc1692 100644 --- a/proto/gen/go/worldengine/micro/v1/service.pb.go +++ b/proto/gen/go/worldengine/micro/v1/service.pb.go @@ -1,6 +1,6 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.36.11 +// protoc-gen-go v1.36.10 // protoc (unknown) // source: worldengine/micro/v1/service.proto diff --git a/proto/gen/ts/worldengine/cardinal/v1/snapshot_pb.ts b/proto/gen/ts/worldengine/cardinal/v1/snapshot_pb.ts index cf599671e..70da2dc25 100644 --- a/proto/gen/ts/worldengine/cardinal/v1/snapshot_pb.ts +++ b/proto/gen/ts/worldengine/cardinal/v1/snapshot_pb.ts @@ -13,7 +13,7 @@ import type { Message } from "@bufbuild/protobuf"; * Describes the file worldengine/cardinal/v1/snapshot.proto. */ export const file_worldengine_cardinal_v1_snapshot: GenFile = /*@__PURE__*/ - fileDesc("CiZ3b3JsZGVuZ2luZS9jYXJkaW5hbC92MS9zbmFwc2hvdC5wcm90bxIXd29ybGRlbmdpbmUuY2FyZGluYWwudjEimQEKCFNuYXBzaG90EhMKC3RpY2tfaGVpZ2h0GAEgASgEEi0KCXRpbWVzdGFtcBgCIAEoCzIaLmdvb2dsZS5wcm90b2J1Zi5UaW1lc3RhbXASOAoLd29ybGRfc3RhdGUYAyABKAsyIy53b3JsZGVuZ2luZS5jYXJkaW5hbC52MS5Xb3JsZFN0YXRlEg8KB3ZlcnNpb24YBCABKA0ifAoKV29ybGRTdGF0ZRIPCgduZXh0X2lkGAEgASgNEhAKCGZyZWVfaWRzGAIgAygNEhMKC2VudGl0eV9hcmNoGAMgAygDEjYKCmFyY2hldHlwZXMYBCADKAsyIi53b3JsZGVuZ2luZS5jYXJkaW5hbC52MS5BcmNoZXR5cGUihAEKCUFyY2hldHlwZRIKCgJpZBgBIAEoBRIZChFjb21wb25lbnRzX2JpdG1hcBgCIAEoDBIMCgRyb3dzGAMgAygDEhAKCGVudGl0aWVzGAQgAygNEjAKB2NvbHVtbnMYBSADKAsyHy53b3JsZGVuZ2luZS5jYXJkaW5hbC52MS5Db2x1bW4iPQoGQ29sdW1uEh8KDmNvbXBvbmVudF9uYW1lGAEgASgJQge6SARyAhABEhIKCmNvbXBvbmVudHMYAiADKAxCdFpSZ2l0aHViLmNvbS9hcmd1cy1sYWJzL3dvcmxkLWVuZ2luZS9wcm90by9nZW4vZ28vd29ybGRlbmdpbmUvY2FyZGluYWwvdjE7Y2FyZGluYWx2MaoCHVdvcmxkRW5naW5lLlByb3RvLkNhcmRpbmFsLlYxYgZwcm90bzM", [file_buf_validate_validate, file_google_protobuf_timestamp]); + fileDesc("CiZ3b3JsZGVuZ2luZS9jYXJkaW5hbC92MS9zbmFwc2hvdC5wcm90bxIXd29ybGRlbmdpbmUuY2FyZGluYWwudjEiygEKCFNuYXBzaG90EhMKC3RpY2tfaGVpZ2h0GAEgASgEEi0KCXRpbWVzdGFtcBgCIAEoCzIaLmdvb2dsZS5wcm90b2J1Zi5UaW1lc3RhbXASOAoLd29ybGRfc3RhdGUYAyABKAsyIy53b3JsZGVuZ2luZS5jYXJkaW5hbC52MS5Xb3JsZFN0YXRlEg8KB3ZlcnNpb24YBCABKA0SEgoKZGlza19zdGF0ZRgFIAEoDBIbChNkaXNrX3N0YXRlX2NoZWNrc3VtGAYgASgMInwKCldvcmxkU3RhdGUSDwoHbmV4dF9pZBgBIAEoDRIQCghmcmVlX2lkcxgCIAMoDRITCgtlbnRpdHlfYXJjaBgDIAMoAxI2CgphcmNoZXR5cGVzGAQgAygLMiIud29ybGRlbmdpbmUuY2FyZGluYWwudjEuQXJjaGV0eXBlIoQBCglBcmNoZXR5cGUSCgoCaWQYASABKAUSGQoRY29tcG9uZW50c19iaXRtYXAYAiABKAwSDAoEcm93cxgDIAMoAxIQCghlbnRpdGllcxgEIAMoDRIwCgdjb2x1bW5zGAUgAygLMh8ud29ybGRlbmdpbmUuY2FyZGluYWwudjEuQ29sdW1uIj0KBkNvbHVtbhIfCg5jb21wb25lbnRfbmFtZRgBIAEoCUIHukgEcgIQARISCgpjb21wb25lbnRzGAIgAygMQnRaUmdpdGh1Yi5jb20vYXJndXMtbGFicy93b3JsZC1lbmdpbmUvcHJvdG8vZ2VuL2dvL3dvcmxkZW5naW5lL2NhcmRpbmFsL3YxO2NhcmRpbmFsdjGqAh1Xb3JsZEVuZ2luZS5Qcm90by5DYXJkaW5hbC5WMWIGcHJvdG8z", [file_buf_validate_validate, file_google_protobuf_timestamp]); /** * Snapshot represents a point-in-time capture of shard state. @@ -40,6 +40,20 @@ export type Snapshot = Message<"worldengine.cardinal.v1.Snapshot"> & { * @generated from field: uint32 version = 4; */ version: number; + + /** + * Disk component file state. Raw Bitcask file bytes. Empty if disk storage not used. + * + * @generated from field: bytes disk_state = 5; + */ + diskState: Uint8Array; + + /** + * SHA-256 checksum of disk_state for integrity verification on restore. + * + * @generated from field: bytes disk_state_checksum = 6; + */ + diskStateChecksum: Uint8Array; }; /** diff --git a/proto/worldengine/cardinal/v1/snapshot.proto b/proto/worldengine/cardinal/v1/snapshot.proto index 1f74e8e8f..0b3be87f7 100644 --- a/proto/worldengine/cardinal/v1/snapshot.proto +++ b/proto/worldengine/cardinal/v1/snapshot.proto @@ -17,6 +17,12 @@ message Snapshot { WorldState world_state = 3; uint32 version = 4; + + // Disk component file state. Raw Bitcask file bytes. Empty if disk storage not used. + bytes disk_state = 5; + + // SHA-256 checksum of disk_state for integrity verification on restore. + bytes disk_state_checksum = 6; } // WorldState represents the ECS world state.