diff --git a/mod/objects/src/db.go b/mod/objects/src/db.go index 8aaf3259..c3474abd 100644 --- a/mod/objects/src/db.go +++ b/mod/objects/src/db.go @@ -1,8 +1,11 @@ package objects import ( + "time" + "github.com/cryptopunkscc/astrald/astral" "gorm.io/gorm" + "gorm.io/gorm/clause" ) type DB struct { @@ -29,12 +32,89 @@ func (db *DB) Find(id *astral.ObjectID) (row *dbObject, err error) { return } -func (db *DB) Create(id *astral.ObjectID, objectType string) (err error) { - err = db.DB.Create(&dbObject{ - ID: id, - Type: objectType, +// Create seeds a tracking row for id with the given type. Idempotent: if a +// row for id already exists, the call is a no-op (existing Type and ReadAt +// are preserved). Used by every "object entered the device" path — +// Module.Store/Load/Probe/GetType and OpCreate — to keep dbObject in sync +// with what flows through the module. +func (db *DB) Create(id *astral.ObjectID, objectType string) error { + return db.DB.Clauses(clause.OnConflict{DoNothing: true}).Create(&dbObject{ + ID: id, + Type: objectType, + ReadAt: time.Now(), }).Error - return +} + +// UpdateReadAt flushes a batch of pending read times. It is UPDATE-only: each +// id present in the table has its read_at bumped, and ids that aren't already +// tracked are silently skipped — first reads do NOT seed rows here. +func (db *DB) UpdateReadAt(reads map[astral.ObjectID]astral.Time) error { + if len(reads) == 0 { + return nil + } + + return db.DB.Session(&gorm.Session{PrepareStmt: true}). + Transaction(func(tx *gorm.DB) error { + for id, at := range reads { + err := tx.Model(&dbObject{}). + Where("id = ?", &id). + Update("read_at", at.Time()).Error + if err != nil { + return err + } + } + return nil + }) +} + +// readCursor marks a position in the (read_at, height) read order. +type readCursor struct { + ReadAt time.Time + Height uint64 +} + +// ListReadOldest returns up to limit object IDs ordered oldest-read first, starting +// after the cursor (nil = from the beginning). next is the cursor to resume +// from, or nil once the final page has been returned. +func (db *DB) ListReadOldest(after *readCursor, limit int) (ids []*astral.ObjectID, next *readCursor, err error) { + if limit <= 0 { + return nil, nil, nil + } + + q := db.DB.Order("read_at, height").Limit(limit) + if after != nil { + // keyset seek: (read_at, height) is a total order since height is unique. + // the row-value form maps directly onto the read_at index, whose entries + // implicitly carry height (the rowid) as their tiebreaker. + q = q.Where("(read_at, height) > (?, ?)", after.ReadAt, after.Height) + } + + var rows []*dbObject + err = q.Find(&rows).Error + if err != nil { + return nil, nil, err + } + + ids = make([]*astral.ObjectID, len(rows)) + for i, row := range rows { + ids[i] = row.ID + } + + // a short page means we reached the end; only hand back a cursor when the + // page was full, so the caller knows there may be more + if len(rows) == limit { + last := rows[len(rows)-1] + next = &readCursor{ReadAt: last.ReadAt, Height: last.Height} + } + + return ids, next, nil +} + +// DeleteObjectCacheByID removes the tracking row for id. Used by purge to drop +// rows whose repo object has gone missing; safe to call for an id with no row +// (no-op). +func (db *DB) DeleteObjectCacheByID(id *astral.ObjectID) error { + return db.DB.Where("id = ?", id).Delete(&dbObject{}).Error } func (db *DB) FindByType(objectType string) (rows []*dbObject, err error) { diff --git a/mod/objects/src/db_object.go b/mod/objects/src/db_object.go index 3f97a798..f7843057 100644 --- a/mod/objects/src/db_object.go +++ b/mod/objects/src/db_object.go @@ -1,15 +1,19 @@ package objects import ( + "time" + "github.com/cryptopunkscc/astrald/astral" "github.com/cryptopunkscc/astrald/mod/objects" - "time" ) type dbObject struct { - ID *astral.ObjectID `gorm:"primaryKey"` + // Height is a monotonic, insert-ordered tiebreaker for the (read_at, height) purge cursor. + Height uint64 `gorm:"primaryKey;autoIncrement"` + ID *astral.ObjectID `gorm:"uniqueIndex"` Type string `gorm:"index"` CreatedAt time.Time `gorm:"index"` + ReadAt time.Time `gorm:"not null;default:CURRENT_TIMESTAMP;index"` } func (dbObject) TableName() string { return objects.DBPrefix + "objects" } diff --git a/mod/objects/src/describe.go b/mod/objects/src/describe.go index ec5dc506..22d55d30 100644 --- a/mod/objects/src/describe.go +++ b/mod/objects/src/describe.go @@ -56,8 +56,8 @@ func (mod *Module) AddDescriber(describer objects.Describer) error { return err } if ok { - mod.external.mu.Lock() - defer mod.external.mu.Unlock() + mod.externalMu.Lock() + defer mod.externalMu.Unlock() if containsSourceIdentity(&mod.describers, source) { return nil diff --git a/mod/objects/src/find.go b/mod/objects/src/find.go index 8cd6dca3..abd73f52 100644 --- a/mod/objects/src/find.go +++ b/mod/objects/src/find.go @@ -57,8 +57,8 @@ func (mod *Module) AddFinder(finder objects.Finder) error { } if ok { - mod.external.mu.Lock() - defer mod.external.mu.Unlock() + mod.externalMu.Lock() + defer mod.externalMu.Unlock() if containsSourceIdentity(&mod.finders, source) { return nil diff --git a/mod/objects/src/loader.go b/mod/objects/src/loader.go index 689cc52c..6a88b294 100644 --- a/mod/objects/src/loader.go +++ b/mod/objects/src/loader.go @@ -22,7 +22,8 @@ func (Loader) Load(node astral.Node, assets assets.Assets, log *log.Logger) (cor mod.router.AddStructPrefix(mod, "Op") - mod.db = &DB{assets.Database()} + mod.db = &DB{DB: assets.Database()} + mod.objectsReadsJournal = newObjectsReadsJournal(mod.db.UpdateReadAt, log) mod.setupDefaultRepos() diff --git a/mod/objects/src/module.go b/mod/objects/src/module.go index e6ffd66b..2d50d413 100644 --- a/mod/objects/src/module.go +++ b/mod/objects/src/module.go @@ -47,11 +47,10 @@ type Module struct { holders sig.Set[objects.Holder] repos sig.Map[string, objects.Repository] - external struct { - mu sync.Mutex - } + externalMu sync.Mutex - groups sig.Map[string, *RepoGroup] + groups sig.Map[string, *RepoGroup] + objectsReadsJournal *objectsReadsJournal } func (mod *Module) Run(ctx *astral.Context) error { @@ -59,6 +58,11 @@ func (mod *Module) Run(ctx *astral.Context) error { <-ctx.Done() + err := mod.objectsReadsJournal.Flush() + if err != nil { + mod.log.Error("object reads journal: final flush: %v", err) + } + return nil } @@ -74,11 +78,17 @@ func (mod *Module) Load(ctx *astral.Context, repo objects.Repository, objectID * return nil, err } + mod.objectsReadsJournal.Mark(objectID) + // parse the object o, _, err := astral.Decode(bytes.NewReader(data), astral.Canonical()) switch { case err == nil: - return o, nil // object successfully loaded + // decode succeeded, so type is known. blob branch below is + // intentionally not seeded — Type='' would shadow the "missing + // astral stamp" signal a later GetType call relies on. + mod.trackObject(objectID, o.ObjectType()) + return o, nil case strings.Contains(err.Error(), "invalid magic bytes"): // the object is a blob return (*astral.Blob)(&data), nil @@ -99,7 +109,14 @@ func (mod *Module) Store(ctx *astral.Context, repo objects.Repository, object as return nil, err } - return w.Commit() + id, err := w.Commit() + if err != nil { + return nil, err + } + + mod.trackObject(id, object.ObjectType()) + + return id, nil } // Deprecated: Use Probe instead. @@ -130,14 +147,8 @@ func (mod *Module) GetType(ctx *astral.Context, objectID *astral.ObjectID) (obje return "", err } - // write to cache - err = mod.db.Create(objectID, t.String()) - switch { - case err == nil: - case strings.Contains(err.Error(), "UNIQUE constraint failed"): - default: - mod.log.Error("onSave: db error: %v", err) - } + // seed dbObject (idempotent; existing rows are left alone) + mod.trackObject(objectID, t.String()) return t.String(), nil } @@ -171,6 +182,9 @@ func (mod *Module) Probe(ctx *astral.Context, repo objects.Repository, objectID _, err = t.ReadFrom(q) if err == nil { probe.Type = astral.String8(t.String()) + // seed dbObject: stamp+type parsed cleanly, type is in hand. + // non-astral blobs fall through unseeded (same rationale as Load). + mod.trackObject(objectID, t.String()) } } @@ -184,6 +198,18 @@ func (mod *Module) AddSearchPreprocessor(pre objects.SearchPreprocessor) error { return mod.searchPre.Add(pre) } +// trackObject seeds the dbObject row for an object the module just +// encountered. Failure is logged but not propagated — seeding is a side +// effect; the calling op must not fail because the cache write did. +// Re-seeding is harmless (db.Create is idempotent via INSERT OR IGNORE), so a +// missed seed is picked up the next time any path touches the same object. +func (mod *Module) trackObject(id *astral.ObjectID, objectType string) { + err := mod.db.Create(id, objectType) + if err != nil { + mod.log.Error("track object %v: %v", id, err) + } +} + // getRepoName returns the name of a repository func (mod *Module) getRepoName(repo objects.Repository) string { for name, r := range mod.repos.Clone() { diff --git a/mod/objects/src/objects_reads_journal.go b/mod/objects/src/objects_reads_journal.go new file mode 100644 index 00000000..79731162 --- /dev/null +++ b/mod/objects/src/objects_reads_journal.go @@ -0,0 +1,82 @@ +package objects + +import ( + "sync" + + "github.com/cryptopunkscc/astrald/astral" + "github.com/cryptopunkscc/astrald/astral/log" +) + +// objectsReadsSink persists a batch of last-read times to durable storage. +type objectsReadsSink func(map[astral.ObjectID]astral.Time) error + +// objectsReadsJournal records qualified object reads in memory and flushes them +// to a sink in batches. The hot read path (Mark) never touches the DB; flushing +// is caller-driven (Flush), wired to shutdown and purge. +type objectsReadsJournal struct { + log *log.Logger + mu sync.Mutex + + pending map[astral.ObjectID]astral.Time // last read time per object + sink objectsReadsSink +} + +func newObjectsReadsJournal(sink objectsReadsSink, log *log.Logger) *objectsReadsJournal { + return &objectsReadsJournal{ + pending: map[astral.ObjectID]astral.Time{}, + sink: sink, + log: log, + } +} + +// Mark records a qualified read of id at the current time. Hot path: O(1), no DB I/O. +func (j *objectsReadsJournal) Mark(id *astral.ObjectID) { + if id == nil { + return + } + j.mu.Lock() + j.pending[*id] = astral.Now() + j.mu.Unlock() +} + +// drain atomically takes and clears the pending set, returning nil when empty. +func (j *objectsReadsJournal) drain() map[astral.ObjectID]astral.Time { + j.mu.Lock() + defer j.mu.Unlock() + if len(j.pending) == 0 { + return nil + } + out := j.pending + j.pending = map[astral.ObjectID]astral.Time{} + return out +} + +// Flush writes pending reads to the sink synchronously. Safe for concurrent use: +// drain hands each entry to exactly one flusher. On sink error the batch is +// re-merged so it isn't lost (keeping the newest time). +func (j *objectsReadsJournal) Flush() error { + batch := j.drain() + if batch == nil { + return nil + } + + err := j.sink(batch) + if err != nil { + j.remerge(batch) + return err + } + + return nil +} + +// remerge puts a failed batch back without clobbering newer marks. +func (j *objectsReadsJournal) remerge(batch map[astral.ObjectID]astral.Time) { + j.mu.Lock() + defer j.mu.Unlock() + for k, v := range batch { + cur, ok := j.pending[k] + if !ok || v.Time().After(cur.Time()) { + j.pending[k] = v + } + } +} diff --git a/mod/objects/src/op_create.go b/mod/objects/src/op_create.go index b243e9f6..a1a05327 100644 --- a/mod/objects/src/op_create.go +++ b/mod/objects/src/op_create.go @@ -58,6 +58,15 @@ func (mod *Module) OpCreate(ctx *astral.Context, q *routing.IncomingQuery, args mod.log.Logv(3, "%v created %v in %v", q.Caller(), objectID, repo) + // seed dbObject: type isn't known from the raw blob stream, so + // piggy-back on Probe — it reads the stamp from the just-written + // repo and seeds via mod.trackObject. Errors are non-fatal; the + // row will be lazily seeded by the next path that touches the object. + _, perr := mod.Probe(ctx, repo, objectID) + if perr != nil { + mod.log.Logv(3, "OpCreate: probe-seed for %v failed: %v", objectID, perr) + } + return ch.Send(objectID) default: diff --git a/mod/objects/src/op_read.go b/mod/objects/src/op_read.go index 69ae30b9..0f6d53a5 100644 --- a/mod/objects/src/op_read.go +++ b/mod/objects/src/op_read.go @@ -48,6 +48,8 @@ func (mod *Module) OpRead(ctx *astral.Context, q *routing.IncomingQuery, args op } defer r.Close() + mod.objectsReadsJournal.Mark(args.ID) + conn := q.AcceptRaw() defer conn.Close() diff --git a/mod/objects/src/purge.go b/mod/objects/src/purge.go index 6e31b1dc..e97cdb1c 100644 --- a/mod/objects/src/purge.go +++ b/mod/objects/src/purge.go @@ -15,43 +15,60 @@ func (mod *Module) purgeRepository(ctx *astral.Context, repo objects.Repository) go func() { defer close(out) - scan, err := repo.Scan(ctx, false) + // flush pending reads so the read order reflects the latest accesses + err := mod.objectsReadsJournal.Flush() if err != nil { - *errPtr = err - return + mod.log.Error("object reads journal: purge flush: %v", err) } - seen := map[string]bool{} - for id := range scan { - // note: this is n-th time i see classic dedup pattern, maybe we will create a helper - if id == nil { - continue + // purge in read order: oldest-read objects first, paged via keyset cursor + var after *readCursor + for { + select { + case <-ctx.Done(): + *errPtr = ctx.Err() + return + default: } - key := id.String() - if seen[key] { - continue + ids, next, err := mod.db.ListReadOldest(after, 256) + if err != nil { + *errPtr = err + return } - seen[key] = true - if len(mod.Holders(id)) > 0 { - continue - } + for _, id := range ids { + if len(mod.Holders(id)) > 0 { + continue + } - err := repo.Delete(ctx, id) - if err != nil { - if errors.Is(err, objects.ErrNotFound) { + err := repo.Delete(ctx, id) + switch { + case err == nil: + err = sig.Send(ctx, out, id) + if err != nil { + *errPtr = err + return + } + + case errors.Is(err, objects.ErrNotFound): + derr := mod.db.DeleteObjectCacheByID(id) + if derr != nil { + mod.log.Error("purge: drop stale cache row %v: %v", id, derr) + } + continue + case errors.Is(err, errors.ErrUnsupported): continue + default: + *errPtr = err + return } - *errPtr = err - return } - err = sig.Send(ctx, out, id) - if err != nil { - *errPtr = err - return + if next == nil { + break } + after = next } }() diff --git a/mod/objects/src/search.go b/mod/objects/src/search.go index 75e8d753..ba7da38f 100644 --- a/mod/objects/src/search.go +++ b/mod/objects/src/search.go @@ -111,8 +111,8 @@ func (mod *Module) AddSearcher(searcher objects.Searcher) error { return err } if ok { - mod.external.mu.Lock() - defer mod.external.mu.Unlock() + mod.externalMu.Lock() + defer mod.externalMu.Unlock() if containsSourceIdentity(&mod.searchers, source) { return nil