Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 85 additions & 5 deletions mod/objects/src/db.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package objects

import (
"time"

"github.com/cryptopunkscc/astrald/astral"
"gorm.io/gorm"
"gorm.io/gorm/clause"
)

type DB struct {
Expand All @@ -29,12 +32,89 @@ func (db *DB) Find(id *astral.ObjectID) (row *dbObject, err error) {
return
}

func (db *DB) Create(id *astral.ObjectID, objectType string) (err error) {
err = db.DB.Create(&dbObject{
ID: id,
Type: objectType,
// Create seeds a tracking row for id with the given type. Idempotent: if a
// row for id already exists, the call is a no-op (existing Type and ReadAt
// are preserved). Used by every "object entered the device" path —
// Module.Store/Load/Probe/GetType and OpCreate — to keep dbObject in sync
// with what flows through the module.
func (db *DB) Create(id *astral.ObjectID, objectType string) error {
return db.DB.Clauses(clause.OnConflict{DoNothing: true}).Create(&dbObject{
ID: id,
Type: objectType,
ReadAt: time.Now(),
}).Error
return
}

// UpdateReadAt flushes a batch of pending read times. It is UPDATE-only: each
// id present in the table has its read_at bumped, and ids that aren't already
// tracked are silently skipped — first reads do NOT seed rows here.
func (db *DB) UpdateReadAt(reads map[astral.ObjectID]astral.Time) error {
if len(reads) == 0 {
return nil
}

return db.DB.Session(&gorm.Session{PrepareStmt: true}).
Transaction(func(tx *gorm.DB) error {
for id, at := range reads {
err := tx.Model(&dbObject{}).
Where("id = ?", &id).
Update("read_at", at.Time()).Error
if err != nil {
return err
}
}
return nil
})
}

// readCursor marks a position in the (read_at, height) read order.
type readCursor struct {
ReadAt time.Time
Height uint64
}

// ListReadOldest returns up to limit object IDs ordered oldest-read first, starting
// after the cursor (nil = from the beginning). next is the cursor to resume
// from, or nil once the final page has been returned.
func (db *DB) ListReadOldest(after *readCursor, limit int) (ids []*astral.ObjectID, next *readCursor, err error) {
if limit <= 0 {
return nil, nil, nil
}

q := db.DB.Order("read_at, height").Limit(limit)
if after != nil {
// keyset seek: (read_at, height) is a total order since height is unique.
// the row-value form maps directly onto the read_at index, whose entries
// implicitly carry height (the rowid) as their tiebreaker.
q = q.Where("(read_at, height) > (?, ?)", after.ReadAt, after.Height)
}

var rows []*dbObject
err = q.Find(&rows).Error
if err != nil {
return nil, nil, err
}

ids = make([]*astral.ObjectID, len(rows))
for i, row := range rows {
ids[i] = row.ID
}

// a short page means we reached the end; only hand back a cursor when the
// page was full, so the caller knows there may be more
if len(rows) == limit {
last := rows[len(rows)-1]
next = &readCursor{ReadAt: last.ReadAt, Height: last.Height}
}

return ids, next, nil
}

// DeleteObjectCacheByID removes the tracking row for id. Used by purge to drop
// rows whose repo object has gone missing; safe to call for an id with no row
// (no-op).
func (db *DB) DeleteObjectCacheByID(id *astral.ObjectID) error {
return db.DB.Where("id = ?", id).Delete(&dbObject{}).Error
}

func (db *DB) FindByType(objectType string) (rows []*dbObject, err error) {
Expand Down
8 changes: 6 additions & 2 deletions mod/objects/src/db_object.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
package objects

import (
"time"

"github.com/cryptopunkscc/astrald/astral"
"github.com/cryptopunkscc/astrald/mod/objects"
"time"
)

type dbObject struct {
ID *astral.ObjectID `gorm:"primaryKey"`
// Height is a monotonic, insert-ordered tiebreaker for the (read_at, height) purge cursor.
Height uint64 `gorm:"primaryKey;autoIncrement"`
ID *astral.ObjectID `gorm:"uniqueIndex"`
Type string `gorm:"index"`
CreatedAt time.Time `gorm:"index"`
ReadAt time.Time `gorm:"not null;default:CURRENT_TIMESTAMP;index"`
}

func (dbObject) TableName() string { return objects.DBPrefix + "objects" }
4 changes: 2 additions & 2 deletions mod/objects/src/describe.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ func (mod *Module) AddDescriber(describer objects.Describer) error {
return err
}
if ok {
mod.external.mu.Lock()
defer mod.external.mu.Unlock()
mod.externalMu.Lock()
defer mod.externalMu.Unlock()

if containsSourceIdentity(&mod.describers, source) {
return nil
Expand Down
4 changes: 2 additions & 2 deletions mod/objects/src/find.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ func (mod *Module) AddFinder(finder objects.Finder) error {
}

if ok {
mod.external.mu.Lock()
defer mod.external.mu.Unlock()
mod.externalMu.Lock()
defer mod.externalMu.Unlock()

if containsSourceIdentity(&mod.finders, source) {
return nil
Expand Down
3 changes: 2 additions & 1 deletion mod/objects/src/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ func (Loader) Load(node astral.Node, assets assets.Assets, log *log.Logger) (cor

mod.router.AddStructPrefix(mod, "Op")

mod.db = &DB{assets.Database()}
mod.db = &DB{DB: assets.Database()}
mod.objectsReadsJournal = newObjectsReadsJournal(mod.db.UpdateReadAt, log)

mod.setupDefaultRepos()

Expand Down
54 changes: 40 additions & 14 deletions mod/objects/src/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,18 +47,22 @@ type Module struct {
holders sig.Set[objects.Holder]
repos sig.Map[string, objects.Repository]

external struct {
mu sync.Mutex
}
externalMu sync.Mutex

groups sig.Map[string, *RepoGroup]
groups sig.Map[string, *RepoGroup]
objectsReadsJournal *objectsReadsJournal
}

func (mod *Module) Run(ctx *astral.Context) error {
mod.ctx = ctx

<-ctx.Done()

err := mod.objectsReadsJournal.Flush()
if err != nil {
mod.log.Error("object reads journal: final flush: %v", err)
}

return nil
}

Expand All @@ -74,11 +78,17 @@ func (mod *Module) Load(ctx *astral.Context, repo objects.Repository, objectID *
return nil, err
}

mod.objectsReadsJournal.Mark(objectID)

// parse the object
o, _, err := astral.Decode(bytes.NewReader(data), astral.Canonical())
switch {
case err == nil:
return o, nil // object successfully loaded
// decode succeeded, so type is known. blob branch below is
// intentionally not seeded — Type='' would shadow the "missing
// astral stamp" signal a later GetType call relies on.
mod.trackObject(objectID, o.ObjectType())
return o, nil

case strings.Contains(err.Error(), "invalid magic bytes"): // the object is a blob
return (*astral.Blob)(&data), nil
Expand All @@ -99,7 +109,14 @@ func (mod *Module) Store(ctx *astral.Context, repo objects.Repository, object as
return nil, err
}

return w.Commit()
id, err := w.Commit()
if err != nil {
return nil, err
}

mod.trackObject(id, object.ObjectType())

return id, nil
}

// Deprecated: Use Probe instead.
Expand Down Expand Up @@ -130,14 +147,8 @@ func (mod *Module) GetType(ctx *astral.Context, objectID *astral.ObjectID) (obje
return "", err
}

// write to cache
err = mod.db.Create(objectID, t.String())
switch {
case err == nil:
case strings.Contains(err.Error(), "UNIQUE constraint failed"):
default:
mod.log.Error("onSave: db error: %v", err)
}
// seed dbObject (idempotent; existing rows are left alone)
mod.trackObject(objectID, t.String())

return t.String(), nil
}
Expand Down Expand Up @@ -171,6 +182,9 @@ func (mod *Module) Probe(ctx *astral.Context, repo objects.Repository, objectID
_, err = t.ReadFrom(q)
if err == nil {
probe.Type = astral.String8(t.String())
// seed dbObject: stamp+type parsed cleanly, type is in hand.
// non-astral blobs fall through unseeded (same rationale as Load).
mod.trackObject(objectID, t.String())
}
}

Expand All @@ -184,6 +198,18 @@ func (mod *Module) AddSearchPreprocessor(pre objects.SearchPreprocessor) error {
return mod.searchPre.Add(pre)
}

// trackObject seeds the dbObject row for an object the module just
// encountered. Failure is logged but not propagated — seeding is a side
// effect; the calling op must not fail because the cache write did.
// Re-seeding is harmless (db.Create is idempotent via INSERT OR IGNORE), so a
// missed seed is picked up the next time any path touches the same object.
func (mod *Module) trackObject(id *astral.ObjectID, objectType string) {
err := mod.db.Create(id, objectType)
if err != nil {
mod.log.Error("track object %v: %v", id, err)
}
}

// getRepoName returns the name of a repository
func (mod *Module) getRepoName(repo objects.Repository) string {
for name, r := range mod.repos.Clone() {
Expand Down
82 changes: 82 additions & 0 deletions mod/objects/src/objects_reads_journal.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package objects

import (
"sync"

"github.com/cryptopunkscc/astrald/astral"
"github.com/cryptopunkscc/astrald/astral/log"
)

// objectsReadsSink persists a batch of last-read times to durable storage.
type objectsReadsSink func(map[astral.ObjectID]astral.Time) error

// objectsReadsJournal records qualified object reads in memory and flushes them
// to a sink in batches. The hot read path (Mark) never touches the DB; flushing
// is caller-driven (Flush), wired to shutdown and purge.
type objectsReadsJournal struct {
log *log.Logger
mu sync.Mutex

pending map[astral.ObjectID]astral.Time // last read time per object
sink objectsReadsSink
}

func newObjectsReadsJournal(sink objectsReadsSink, log *log.Logger) *objectsReadsJournal {
return &objectsReadsJournal{
pending: map[astral.ObjectID]astral.Time{},
sink: sink,
log: log,
}
}

// Mark records a qualified read of id at the current time. Hot path: O(1), no DB I/O.
func (j *objectsReadsJournal) Mark(id *astral.ObjectID) {
if id == nil {
return
}
j.mu.Lock()
j.pending[*id] = astral.Now()
j.mu.Unlock()
}

// drain atomically takes and clears the pending set, returning nil when empty.
func (j *objectsReadsJournal) drain() map[astral.ObjectID]astral.Time {
j.mu.Lock()
defer j.mu.Unlock()
if len(j.pending) == 0 {
return nil
}
out := j.pending
j.pending = map[astral.ObjectID]astral.Time{}
return out
}

// Flush writes pending reads to the sink synchronously. Safe for concurrent use:
// drain hands each entry to exactly one flusher. On sink error the batch is
// re-merged so it isn't lost (keeping the newest time).
func (j *objectsReadsJournal) Flush() error {
batch := j.drain()
if batch == nil {
return nil
}

err := j.sink(batch)
if err != nil {
j.remerge(batch)
return err
}

return nil
}

// remerge puts a failed batch back without clobbering newer marks.
func (j *objectsReadsJournal) remerge(batch map[astral.ObjectID]astral.Time) {
j.mu.Lock()
defer j.mu.Unlock()
for k, v := range batch {
cur, ok := j.pending[k]
if !ok || v.Time().After(cur.Time()) {
j.pending[k] = v
}
}
}
9 changes: 9 additions & 0 deletions mod/objects/src/op_create.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,15 @@ func (mod *Module) OpCreate(ctx *astral.Context, q *routing.IncomingQuery, args

mod.log.Logv(3, "%v created %v in %v", q.Caller(), objectID, repo)

// seed dbObject: type isn't known from the raw blob stream, so
// piggy-back on Probe — it reads the stamp from the just-written
// repo and seeds via mod.trackObject. Errors are non-fatal; the
// row will be lazily seeded by the next path that touches the object.
_, perr := mod.Probe(ctx, repo, objectID)
if perr != nil {
mod.log.Logv(3, "OpCreate: probe-seed for %v failed: %v", objectID, perr)
}

return ch.Send(objectID)

default:
Expand Down
2 changes: 2 additions & 0 deletions mod/objects/src/op_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ func (mod *Module) OpRead(ctx *astral.Context, q *routing.IncomingQuery, args op
}
defer r.Close()

mod.objectsReadsJournal.Mark(args.ID)

conn := q.AcceptRaw()
defer conn.Close()

Expand Down
Loading