diff --git a/crdt.go b/crdt.go index 595d8e7..28a4656 100644 --- a/crdt.go +++ b/crdt.go @@ -76,6 +76,52 @@ type SessionDAGService interface { Session(context.Context) ipld.NodeGetter } +// PutEvent describes a materialised-view update delivered to PutHook. +type PutEvent struct { + // Key whose value was updated. + Key ds.Key + // OldValue is the previous value for the key, if any. It is populated only + // when HookLoadPreviousValue is true, otherwise it is nil. + OldValue []byte + // NewValue is the new value for the key after the update. + NewValue []byte + // OldPriority is the priority of OldValue. It is populated only when + // HookLoadPreviousValue is true, otherwise it is 0. It is also 0 when the + // key had no prior value (real deltas always have priority >= 1, so a zero + // OldPriority means "no previous value"); in that case OldValue is nil as + // well. + OldPriority uint64 + // NewPriority is the priority of NewValue. For normal puts this matches + // Delta.GetPriority(). For partial-tombstone puts (where a tombstone removed + // the previous winner and a surviving element took over), NewPriority is the + // surviving element's priority, which differs from the tombstone delta's + // priority. + NewPriority uint64 + // Delta is the triggering delta: the one whose merge caused the state + // change. For partial-tombstone puts (where a tombstone removed the previous + // winner and a surviving element took over), Delta is the tombstone delta, + // not the older delta that originally wrote the surviving value. Delta is + // nil when the update did not originate from a merged delta (e.g. PurgeDAG). + // Consumers must not retain Delta past the callback or mutate it. + Delta Delta +} + +// DeleteEvent describes a materialised-view removal delivered to DeleteHook. +type DeleteEvent struct { + // Key whose value was updated. + Key ds.Key + // LastValue is the previous value for the key, if any. It is populated only + // when HookLoadPreviousValue is true, otherwise it is nil. + LastValue []byte + // LastPriority is the priority of LastValue. It is populated only when + // HookLoadPreviousValue is true, otherwise it is 0. + LastPriority uint64 + // Delta is the tombstone delta that triggered the removal, or nil when the + // removal did not originate from a merged delta (e.g. PurgeDAG). Consumers + // must not retain Delta past the callback or mutate it. + Delta Delta +} + // Options holds configurable values for Datastore. type Options struct { Logger logging.StandardLogger @@ -85,19 +131,45 @@ type Options struct { // been already broadcasted by other replicas in the // interval. Default: 1m. RebroadcastInterval time.Duration - // The PutHook function is triggered whenever an element - // is successfully added to the datastore (either by a local - // or remote update), and only when that addition is considered the - // prevalent value. Default: nil. - PutHook func(k ds.Key, v []byte) - // The DeleteHook function is triggered whenever a version of an - // element is successfully removed from the datastore (either by a - // local or remote update). Unordered and concurrent updates may - // result in the DeleteHook being triggered even though the element is - // still present in the datastore because it was re-added or not fully - // tombstoned. If that is relevant, use Has() to check if the removed - // element is still part of the datastore. Default: nil. - DeleteHook func(k ds.Key) + // PutHook is triggered whenever the materialised view for a key is + // updated (either by a local or remote update). It fires once per key + // per merged delta, even when the delta carries multiple elements for + // the same key — in that case the lex-largest value wins at the shared + // delta priority and only the winner is reported. For partial-tombstone + // puts (where a tombstone removed the previous winner and a surviving + // element took over), the hook fires with the surviving value. + // Default: nil. + // + // PutEvent.OldValue and PutEvent.OldPriority are populated only when + // HookLoadPreviousValue is true, otherwise they are zero-valued. When + // HookLoadPreviousValue is true, the hook is not fired for partial + // tombstones where the winning element is unchanged (same value and + // priority); a same-value/different-priority swap still fires the hook. + // + // The callback is invoked while internal locks are held; it must not + // call back into the Datastore or it will deadlock. + PutHook func(PutEvent) + // DeleteHook is triggered whenever an element is fully removed from the + // datastore (either by a local or remote update), i.e. when no surviving + // element exists for the key after tombstone processing. Default: nil. + // + // DeleteEvent.LastValue and DeleteEvent.LastPriority are populated only when + // HookLoadPreviousValue is true, otherwise they are zero-valued. When + // HookLoadPreviousValue is true, the hook is not fired for tombstones + // targeting keys that had no prior value in the datastore. + // + // The callback is invoked while internal locks are held; it must not + // call back into the Datastore or it will deadlock. + DeleteHook func(DeleteEvent) + // HookLoadPreviousValue controls whether PutHook/DeleteHook receive the + // previous value and priority for the key. When true, the datastore is read + // before each hook-relevant write so that PutEvent.OldValue / + // PutEvent.OldPriority and DeleteEvent.LastValue / DeleteEvent.LastPriority + // can be populated; this incurs one extra read per triggered hook. When + // true, the hook is also skipped when the observed value would not actually + // change (see PutHook/DeleteHook doc). + // Default: false. + HookLoadPreviousValue bool // NumWorkers specifies the number of workers ready to // retrieve and merge deltas while walking the DAGs. Default: // 5. @@ -183,12 +255,13 @@ func (opts *Options) verify() error { // DefaultOptions initializes an Options object with sensible defaults. func DefaultOptions() *Options { return &Options{ - Logger: logging.Logger("crdt"), - RebroadcastInterval: time.Minute, - PutHook: nil, - DeleteHook: nil, - NumWorkers: 5, - DAGSyncerTimeout: 5 * time.Minute, + Logger: logging.Logger("crdt"), + RebroadcastInterval: time.Minute, + PutHook: nil, + DeleteHook: nil, + HookLoadPreviousValue: false, + NumWorkers: 5, + DAGSyncerTimeout: 5 * time.Minute, // always keeping // https://github.com/libp2p/go-libp2p-core/blob/master/network/network.go#L23 // in sight @@ -292,6 +365,12 @@ func New( return nil, err } + hooks := setHooks{ + putHook: opts.PutHook, + deleteHook: opts.DeleteHook, + hookLoadPreviousValue: opts.HookLoadPreviousValue, + } + // /set fullSetNs := namespace.ChildString(opts.crdtOpts.Namespaces.Set) // /heads @@ -300,24 +379,8 @@ func New( // /heads fullDagHeadsNs := namespace.ChildString(opts.crdtOpts.Namespaces.DAGHeads) - setPutHook := func(k string, v []byte) { - if opts.PutHook == nil { - return - } - dsk := ds.NewKey(k) - opts.PutHook(dsk, v) - } - - setDeleteHook := func(k string) { - if opts.DeleteHook == nil { - return - } - dsk := ds.NewKey(k) - opts.DeleteHook(dsk) - } - ctx, cancel := context.WithCancel(context.Background()) - set, err := newCRDTSet(ctx, store, fullSetNs, dagSyncer, opts.Logger, setPutHook, setDeleteHook, opts.crdtOpts.DeltaFactory) + set, err := newCRDTSet(ctx, store, fullSetNs, dagSyncer, opts.Logger, hooks, opts.crdtOpts.DeltaFactory) if err != nil { cancel() return nil, fmt.Errorf("error setting up crdt set: %w", err) diff --git a/crdt_test.go b/crdt_test.go index 32b96aa..b2b463d 100644 --- a/crdt_test.go +++ b/crdt_test.go @@ -9,6 +9,7 @@ import ( "sync" "sync/atomic" "testing" + "testing/synctest" "time" blockstore "github.com/ipfs/boxo/blockstore" @@ -566,39 +567,183 @@ func TestCRDTPrintDAG(t *testing.T) { } func TestCRDTHooks(t *testing.T) { - ctx := context.Background() + synctest.Test(t, func(t *testing.T) { + ctx := t.Context() + var put int64 + var deleted int64 + + opts := DefaultOptions() + opts.PutHook = func(PutEvent) { + atomic.AddInt64(&put, 1) + } + opts.DeleteHook = func(DeleteEvent) { + atomic.AddInt64(&deleted, 1) + } - var put int64 - var deleted int64 + replicas, closeReplicas := makeReplicas(t, opts) + t.Cleanup(closeReplicas) - opts := DefaultOptions() - opts.PutHook = func(k ds.Key, v []byte) { - atomic.AddInt64(&put, 1) - } - opts.DeleteHook = func(k ds.Key) { - atomic.AddInt64(&deleted, 1) - } + k := ds.RandomKey() + if err := replicas[0].Put(ctx, k, nil); err != nil { + t.Fatal(err) + } + if err := replicas[0].Delete(ctx, k); err != nil { + t.Fatal(err) + } + synctest.Wait() + if atomic.LoadInt64(&put) != int64(len(replicas)) { + t.Error("all replicas should have notified Put", put) + } + if atomic.LoadInt64(&deleted) != int64(len(replicas)) { + t.Error("all replicas should have notified Remove", deleted) + } + }) +} - replicas, closeReplicas := makeReplicas(t, opts) - defer closeReplicas() +// TestCRDTPutHookLoadsPreviousValue verifies that PutHook receives OldValue +// populated when HookLoadPreviousValue is enabled. +func TestCRDTPutHookLoadsPreviousValue(t *testing.T) { + t.Parallel() + synctest.Test(t, func(t *testing.T) { + type hookCall struct { + key ds.Key + newVal []byte + oldVal []byte + prio uint64 + } + var calls []hookCall + var mu sync.Mutex + + opts := DefaultOptions() + opts.HookLoadPreviousValue = true + opts.PutHook = func(e PutEvent) { + mu.Lock() + var prio uint64 + if e.Delta != nil { + prio = e.Delta.GetPriority() + } + calls = append(calls, hookCall{e.Key, e.NewValue, e.OldValue, prio}) + mu.Unlock() + } - k := ds.RandomKey() - err := replicas[0].Put(ctx, k, nil) - if err != nil { - t.Fatal(err) - } + replicas, closeReplicas := makeReplicas(t, opts) + t.Cleanup(closeReplicas) - err = replicas[0].Delete(ctx, k) - if err != nil { - t.Fatal(err) - } - time.Sleep(100 * time.Millisecond) - if atomic.LoadInt64(&put) != int64(len(replicas)) { - t.Error("all replicas should have notified Put", put) - } - if atomic.LoadInt64(&deleted) != int64(len(replicas)) { - t.Error("all replicas should have notified Remove", deleted) - } + k := ds.NewKey("/testkey") + ctx := t.Context() + + // First put: all replicas should fire PutHook with newVal="first", oldVal=nil. + if err := replicas[0].Put(ctx, k, []byte("first")); err != nil { + t.Fatal(err) + } + synctest.Wait() + + mu.Lock() + if len(calls) != len(replicas) { + t.Errorf("expected %d PutHook calls (one per replica), got %d", len(replicas), len(calls)) + } + for i, c := range calls { + if string(c.newVal) != "first" { + t.Errorf("call[%d]: expected newVal %q, got %q", i, "first", c.newVal) + } + if c.oldVal != nil { + t.Errorf("call[%d]: expected oldVal nil on first put, got %q", i, c.oldVal) + } + if c.prio == 0 { + t.Errorf("call[%d]: expected non-zero delta priority, got 0 (delta not forwarded?)", i) + } + } + calls = nil + mu.Unlock() + + // Second put: all replicas should fire PutHook with newVal="second", oldVal="first". + if err := replicas[0].Put(ctx, k, []byte("second")); err != nil { + t.Fatal(err) + } + synctest.Wait() + + mu.Lock() + defer mu.Unlock() + if len(calls) != len(replicas) { + t.Errorf("expected %d PutHook calls on second put, got %d", len(replicas), len(calls)) + } + for i, c := range calls { + if string(c.newVal) != "second" { + t.Errorf("call[%d]: expected newVal %q, got %q", i, "second", c.newVal) + } + if string(c.oldVal) != "first" { + t.Errorf("call[%d]: expected oldVal %q, got %q", i, "first", c.oldVal) + } + } + }) +} + +// TestCRDTDeleteHookLoadsPreviousValue verifies that DeleteHook receives +// LastValue populated when HookLoadPreviousValue is enabled, and is suppressed for +// tombstones targeting keys with no prior value. +func TestCRDTDeleteHookLoadsPreviousValue(t *testing.T) { + t.Parallel() + synctest.Test(t, func(t *testing.T) { + type hookCall struct { + key ds.Key + lastVal []byte + hasDelta bool + } + var calls []hookCall + var mu sync.Mutex + + opts := DefaultOptions() + opts.HookLoadPreviousValue = true + opts.DeleteHook = func(e DeleteEvent) { + mu.Lock() + calls = append(calls, hookCall{e.Key, e.LastValue, e.Delta != nil}) + mu.Unlock() + } + + replicas, closeReplicas := makeReplicas(t, opts) + t.Cleanup(closeReplicas) + + ctx := t.Context() + k := ds.NewKey("/testkey") + if err := replicas[0].Put(ctx, k, []byte("hello")); err != nil { + t.Fatal(err) + } + synctest.Wait() + + if err := replicas[0].Delete(ctx, k); err != nil { + t.Fatal(err) + } + synctest.Wait() + + mu.Lock() + if len(calls) != len(replicas) { + t.Errorf("expected %d DeleteHook calls (one per replica), got %d", len(replicas), len(calls)) + } + for i, c := range calls { + if string(c.lastVal) != "hello" { + t.Errorf("call[%d]: expected lastVal %q, got %q", i, "hello", c.lastVal) + } + if !c.hasDelta { + t.Errorf("call[%d]: expected non-nil Delta from tombstone merge, got nil", i) + } + } + calls = nil + mu.Unlock() + + // Delete the same key again — it no longer exists. Rmv produces no + // tombstones so Delete returns early without publishing — the hook + // must not fire. + if err := replicas[0].Delete(ctx, k); err != nil { + t.Fatal(err) + } + synctest.Wait() + + mu.Lock() + defer mu.Unlock() + if n := len(calls); n != 0 { + t.Errorf("DeleteHook should not be called for a non-existent key, got %d calls", n) + } + }) } func TestCRDTBatch(t *testing.T) { diff --git a/examples/globaldb/globaldb.go b/examples/globaldb/globaldb.go index 8a30047..910850e 100644 --- a/examples/globaldb/globaldb.go +++ b/examples/globaldb/globaldb.go @@ -193,11 +193,11 @@ func main() { opts := crdt.DefaultOptions() opts.Logger = logger opts.RebroadcastInterval = 5 * time.Second - opts.PutHook = func(k ds.Key, v []byte) { - fmt.Printf("Added: [%s] -> %s\n", k, string(v)) + opts.PutHook = func(e crdt.PutEvent) { + fmt.Printf("Added: [%s] -> %s\n", e.Key, string(e.NewValue)) } - opts.DeleteHook = func(k ds.Key) { - fmt.Printf("Removed: [%s]\n", k) + opts.DeleteHook = func(e crdt.DeleteEvent) { + fmt.Printf("Removed: [%s]\n", e.Key) } crdt, err := crdt.New(store, ds.NewKey("crdt"), ipfs, pubsubBC, opts) diff --git a/set.go b/set.go index fbe5b10..d9d6331 100644 --- a/set.go +++ b/set.go @@ -36,6 +36,22 @@ type Set interface { InSet(ctx context.Context, key string) (bool, error) } +// setHooks holds the hook functions for put and delete events and whether +// the hooks should be invoked with the previous value loaded from the store. +type setHooks struct { + putHook func(PutEvent) + deleteHook func(DeleteEvent) + hookLoadPreviousValue bool +} + +// keyState pairs a materialised-view value with its priority. Used inside +// putTombs to snapshot the pre-tombstone state and track the post-tombstone +// winner without maintaining parallel value/priority maps. +type keyState struct { + value []byte + priority uint64 +} + // set implements an Add-Wins Observed-Remove Set using delta-CRDTs // (https://arxiv.org/abs/1410.2803) and backing all the data in a // go-datastore. It is fully agnostic to MerkleCRDTs or the delta distribution @@ -46,8 +62,7 @@ type set struct { store ds.Datastore dagService ipld.DAGService namespace ds.Key - putHook func(key string, v []byte) - deleteHook func(key string) + hooks setHooks deltaFactory func() Delta logger logging.StandardLogger @@ -62,8 +77,7 @@ func newCRDTSet( namespace ds.Key, dagService ipld.DAGService, logger logging.StandardLogger, - putHook func(key string, v []byte), - deleteHook func(key string), + hooks setHooks, deltaFactory func() Delta, ) (*set, error) { set := &set{ @@ -71,14 +85,34 @@ func newCRDTSet( store: d, dagService: dagService, logger: logger, - putHook: putHook, - deleteHook: deleteHook, + hooks: hooks, deltaFactory: deltaFactory, } return set, nil } +// triggerPutHook fires the put hook with evt. Callers build evt themselves +// and must invoke this after the relevant batch has been committed, so the +// hook callback can observe the post-write state via s.store. +func (s *set) triggerPutHook(evt PutEvent) { + if s.hooks.putHook == nil { + return + } + s.hooks.putHook(evt) +} + +// triggerDeleteHook fires the delete hook with evt. Callers build evt +// themselves and must invoke this after the relevant batch has been +// committed, so the hook callback can observe the post-write state via +// s.store. +func (s *set) triggerDeleteHook(evt DeleteEvent) { + if s.hooks.deleteHook == nil { + return + } + s.hooks.deleteHook(evt) +} + // Add returns a new delta-set adding the given key/value. func (s *set) Add(ctx context.Context, key string, value []byte) (Delta, error) { delta := s.deltaFactory() @@ -338,50 +372,6 @@ func (s *set) setPriority(ctx context.Context, writeStore ds.Write, key string, return writeStore.Put(ctx, prioK, buf[0:n]) } -// sets a value if priority is higher. When equal, it sets if the -// value is lexicographically higher than the current value. -func (s *set) setValue(ctx context.Context, writeStore ds.Write, key, id string, value []byte, prio uint64) error { - // If this key was tombstoned already, do not store/update the value. - deleted, err := s.inTombsKeyID(ctx, key, id) - if err != nil || deleted { - return err - } - - curPrio, err := s.getPriority(ctx, key) - if err != nil { - return err - } - - if prio < curPrio { - return nil - } - valueK := s.valueKey(key) - - if prio == curPrio { - curValue, _ := s.store.Get(ctx, valueK) - // new value greater than old - if bytes.Compare(curValue, value) >= 0 { - return nil - } - } - - // store value - err = writeStore.Put(ctx, valueK, value) - if err != nil { - return err - } - - // store priority - err = s.setPriority(ctx, writeStore, key, prio) - if err != nil { - return err - } - - // trigger add hook - s.putHook(key, value) - return nil -} - // findBestValue looks for all entries for the given key, figures out their // priority from their delta (skipping the blocks by the given pendingTombIDs) // and returns the value with the highest priority that is not tombstoned nor @@ -503,13 +493,19 @@ NEXT: // but with the batching optimization the locks would need to be hold until // the batch is written), and one lock per key might be way worse than a single // global lock in the end. -func (s *set) putElems(ctx context.Context, elems []*pb.Element, id string, prio uint64) error { +// +// delta is the triggering delta; it supplies the write priority and is +// forwarded to the put hook. delta must not be nil when elems is non-empty. +func (s *set) putElems(ctx context.Context, elems []*pb.Element, id string, delta Delta) error { s.putElemsMux.Lock() defer s.putElemsMux.Unlock() if len(elems) == 0 { return nil } + if delta == nil { + return errors.New("putElems: delta must not be nil when elems is non-empty") + } var store ds.Write = s.store var err error @@ -521,23 +517,118 @@ func (s *set) putElems(ctx context.Context, elems []*pb.Element, id string, prio } } + // Per-key accumulator: resolve the winning value across all same-key + // elements in this delta before writing, so the store and the put hook + // reflect CRDT semantics (lex-largest wins at tied priority) independent + // of element order, and so the hook fires once per key rather than once + // per element. + type putKeyState struct { + bestVal []byte + skipWrite bool // delta loses to the current store winner for this key + prev keyState + prevLoaded bool // prev snapshot has been read from the store + } + + prio := delta.GetPriority() + loadPrev := s.hooks.hookLoadPreviousValue && s.hooks.putHook != nil + states := make(map[string]*putKeyState) + for _, e := range elems { e.Id = id // overwrite the identifier as it would come unset key := e.GetKey() // /namespace/elems// k := s.elemsPrefix(key).ChildString(id) - err := store.Put(ctx, k, nil) - if err != nil { + if err := store.Put(ctx, k, nil); err != nil { return err } - // update the value if applicable: - // * higher priority than we currently have. - // * not tombstoned before. - err = s.setValue(ctx, store, key, id, e.GetValue(), prio) + // Skip tombstoned elements: they cannot contribute to the in-delta + // winner (mirrors setValue's inTombsKeyID check). + deleted, err := s.inTombsKeyID(ctx, key, id) if err != nil { return err } + if deleted { + continue + } + + st, ok := states[key] + if !ok { + st = &putKeyState{} + states[key] = st + + curPrio, err := s.getPriority(ctx, key) + if err != nil { + return err + } + if prio < curPrio { + st.skipWrite = true + } else if prio == curPrio { + // Tied priority: the store's current value is a competitor in + // the lex-largest tiebreak. Seed bestVal with it and snapshot + // it as prev so the post-loop no-op check can suppress a + // redundant write when no in-delta elem beats curVal. + curVal, _ := s.store.Get(ctx, s.valueKey(key)) + st.bestVal = curVal + st.prev = keyState{value: curVal, priority: curPrio} + st.prevLoaded = true + } else { + // prio > curPrio: delta always wins, no tiebreak needed + // bestVal stays nil and will be replaced by any in-delta value. + if loadPrev { + curVal, _ := s.store.Get(ctx, s.valueKey(key)) + st.prev = keyState{value: curVal, priority: curPrio} + st.prevLoaded = true + } + } + } + + if st.skipWrite { + continue + } + + if v := e.GetValue(); bytes.Compare(v, st.bestVal) > 0 { + st.bestVal = v + } + } + + var events []PutEvent + if s.hooks.putHook != nil { + events = make([]PutEvent, 0, len(states)) + } + + for key, st := range states { + if st.skipWrite { + continue + } + // Suppress no-op writes: when the chosen value and priority match + // what is already in the store (tied priority, equal bytes), nothing + // changes. Mirrors setValue's equal-bytes skip. + if st.prevLoaded && prio == st.prev.priority && bytes.Equal(st.bestVal, st.prev.value) { + continue + } + + if err := store.Put(ctx, s.valueKey(key), st.bestVal); err != nil { + return err + } + if err := s.setPriority(ctx, store, key, prio); err != nil { + return err + } + + if s.hooks.putHook == nil { + continue + } + evt := PutEvent{ + Key: ds.NewKey(key), + NewValue: st.bestVal, + NewPriority: prio, + Delta: delta, + } + if loadPrev { + evt.OldValue = st.prev.value + evt.OldPriority = st.prev.priority + } + events = append(events, evt) } if batching { @@ -546,10 +637,18 @@ func (s *set) putElems(ctx context.Context, elems []*pb.Element, id string, prio return err } } + + for _, evt := range events { + s.triggerPutHook(evt) + } return nil } -func (s *set) putTombs(ctx context.Context, tombs []*pb.Element) error { +// putTombs applies tombstones and recomputes winners for the affected keys. +// delta is the tombstone delta that triggered the removal and is forwarded to +// put/delete hooks. delta may be nil when the caller has no originating delta +// (e.g. the purge path), since it is only used for hook forwarding. +func (s *set) putTombs(ctx context.Context, tombs []*pb.Element, delta Delta) error { if len(tombs) == 0 { return nil } @@ -567,12 +666,49 @@ func (s *set) putTombs(ctx context.Context, tombs []*pb.Element) error { // key -> tombstonedBlockID. Carries the tombstoned blocks for each // element in this delta. deletedElems := make(map[string][]string) + + var newStates, prevStates map[string]keyState + if s.hooks.putHook != nil || s.hooks.deleteHook != nil { + // newStates holds the winning (value, priority) for keys that were + // partially tombstoned (tombstone removed a previous winner but a + // surviving element took over). A key absent from this map was fully + // deleted. Doubles as the fully-deleted oracle, so it is allocated + // whenever any hook is registered; nil means no hooks are configured + // and the firing loop is skipped entirely. + newStates = make(map[string]keyState) + + if s.hooks.hookLoadPreviousValue { + // prevStates caches the (value, priority) at the time each key is + // first seen in this delta, before any write. Only keys that + // existed in the store are added; absent keys are omitted so the + // two-value map lookup (v, ok) cleanly distinguishes "had a value" + // from "was not in the store". + prevStates = make(map[string]keyState) + } + } + var errs []error for _, e := range tombs { // /namespace/tombs// key := e.GetKey() id := e.GetId() valueK := s.valueKey(key) + + // Capture the current value and priority on first encounter, before any + // write for this key, so hooks receive the pre-tombstone snapshot. + if prevStates != nil { + if _, seen := prevStates[key]; !seen { + if curVal, err := s.store.Get(ctx, valueK); err == nil { + entry := keyState{value: curVal} + // Any error (including ErrNotFound): omit from map; hook + // firing uses (v, ok) to detect absence. + if curPrio, err := s.getPriority(ctx, key); err == nil { + entry.priority = curPrio + } + prevStates[key] = entry + } + } + } deletedElems[key] = append(deletedElems[key], id) // Find best value for element that we are going to delete @@ -581,7 +717,11 @@ func (s *set) putTombs(ctx context.Context, tombs []*pb.Element) error { return err } - if v == nil { + // p == 0 means findBestValue found no surviving element: real deltas + // always have priority >= 1 (assigned as height+1 in addDAGNode), so a + // zero priority can only come from the zero-value init. + if p == 0 { + delete(newStates, key) if err = store.Delete(ctx, valueK); err != nil { errs = append(errs, err) } @@ -589,6 +729,9 @@ func (s *set) putTombs(ctx context.Context, tombs []*pb.Element) error { errs = append(errs, err) } } else { + if newStates != nil { + newStates[key] = keyState{value: v, priority: p} + } if err = store.Put(ctx, valueK, v); err != nil { errs = append(errs, err) } @@ -614,11 +757,48 @@ func (s *set) putTombs(ctx context.Context, tombs []*pb.Element) error { } } - // run delete hook only once for all versions of the same element - // tombstoned in this delta. Note it may be that the element was not - // fully deleted and only a different value took its place. - for del := range deletedElems { - s.deleteHook(del) + // Fire hooks once per key after all writes are committed. + // Skipped entirely when no hooks are registered (newStates == nil). Fully + // deleted keys (absent from newStates) trigger the delete hook; partially + // tombstoned keys (present in newStates) trigger the put hook. When + // hookLoadPreviousValue is set, prevStates is used to suppress no-op hook + // firings (winning element unchanged — same value and priority — or + // tombstone for a key that had no value). + if newStates != nil { + for del := range deletedElems { + if newState, partial := newStates[del]; partial { + var prev keyState + if prevStates != nil { + prev = prevStates[del] // zero-valued if key was absent + if newState.priority == prev.priority && bytes.Equal(newState.value, prev.value) { + continue // same winning element, genuine no-op + } + } + s.triggerPutHook(PutEvent{ + Key: ds.NewKey(del), + OldValue: prev.value, + NewValue: newState.value, + OldPriority: prev.priority, + NewPriority: newState.priority, + Delta: delta, + }) + } else { + var prev keyState + if prevStates != nil { + var ok bool + prev, ok = prevStates[del] + if !ok { + continue // key had no value before tombstone, nothing to notify + } + } + s.triggerDeleteHook(DeleteEvent{ + Key: ds.NewKey(del), + LastValue: prev.value, + LastPriority: prev.priority, + Delta: delta, + }) + } + } } return nil @@ -635,12 +815,12 @@ func (s *set) Merge(ctx context.Context, d Delta, id string) error { return err } - err = s.putTombs(ctx, tombs) + err = s.putTombs(ctx, tombs, d) if err != nil { return err } - return s.putElems(ctx, elems, id, d.GetPriority()) + return s.putElems(ctx, elems, id, d) } // currently unused @@ -764,7 +944,21 @@ func (s *set) purgeKeyBlocks(ctx context.Context, key string, blockCIDs map[cid. } valueK := s.valueKey(key) - if bestVal == nil { + + // Fetch old value and priority before modifying the value key, so hooks + // receive the pre-purge snapshot. Only read when a hook that needs it is + // configured. A missing key yields prev.value == nil (Get returns nil on + // ErrNotFound), which is used below to detect "no prior value". + var prev keyState + if s.hooks.hookLoadPreviousValue && (s.hooks.putHook != nil || s.hooks.deleteHook != nil) { + prev.value, _ = s.store.Get(ctx, valueK) + prev.priority, _ = s.getPriority(ctx, key) + } + + // bestPrio == 0 means findBestValue found no surviving element: real deltas + // always have priority >= 1 (assigned as height+1 in addDAGNode), so a zero + // priority can only come from the zero-value init. + if bestPrio == 0 { var errs []error if err := s.store.Delete(ctx, valueK); err != nil && !errors.Is(err, ds.ErrNotFound) { errs = append(errs, err) @@ -775,7 +969,18 @@ func (s *set) purgeKeyBlocks(ctx context.Context, key string, blockCIDs map[cid. if err := errors.Join(errs...); err != nil { return err } - s.deleteHook(key) + // Suppress the delete hook when the key had no prior value in the + // store. Matches putTombs' "nothing to notify" rule. Only active when + // hookLoadPreviousValue is set, since that's when we know the prior + // state. + if s.hooks.hookLoadPreviousValue && prev.value == nil { + return nil + } + s.triggerDeleteHook(DeleteEvent{ + Key: ds.NewKey(key), + LastValue: prev.value, + LastPriority: prev.priority, + }) } else { if err := s.store.Put(ctx, valueK, bestVal); err != nil { return err @@ -783,7 +988,13 @@ func (s *set) purgeKeyBlocks(ctx context.Context, key string, blockCIDs map[cid. if err := s.setPriority(ctx, s.store, key, bestPrio); err != nil { return err } - s.putHook(key, bestVal) + s.triggerPutHook(PutEvent{ + Key: ds.NewKey(key), + OldValue: prev.value, + NewValue: bestVal, + OldPriority: prev.priority, + NewPriority: bestPrio, + }) } return nil diff --git a/set_test.go b/set_test.go new file mode 100644 index 0000000..027c84d --- /dev/null +++ b/set_test.go @@ -0,0 +1,1114 @@ +package crdt + +import ( + "bytes" + "testing" + + dshelp "github.com/ipfs/boxo/datastore/dshelp" + mdutils "github.com/ipfs/boxo/ipld/merkledag/test" + cid "github.com/ipfs/go-cid" + ds "github.com/ipfs/go-datastore" + dssync "github.com/ipfs/go-datastore/sync" + pb "github.com/ipfs/go-ds-crdt/pb" + ipld "github.com/ipfs/go-ipld-format" +) + +func newTestSet(t *testing.T, dagService ipld.DAGService, hooks setHooks) *set { + t.Helper() + store := dssync.MutexWrap(ds.NewMapDatastore()) + df := func() Delta { return &pbDelta{Delta: &pb.Delta{}} } + s, err := newCRDTSet(t.Context(), store, ds.NewKey("/testset"), dagService, DefaultOptions().Logger, hooks, df) + if err != nil { + t.Fatalf("newCRDTSet: %v", err) + } + return s +} + +// addElem creates a real DAG block for (key, value, prio), stores it in +// dagService, seeds the set with putElems, and returns the block key (element ID). +func addElem(t *testing.T, s *set, dagService ipld.DAGService, key string, value []byte, prio uint64) string { + t.Helper() + ctx := t.Context() + d := &pbDelta{Delta: &pb.Delta{}} + d.SetElements([]*pb.Element{{Key: key, Value: value}}) + d.SetPriority(prio) + + node, err := makeNode(d, nil) + if err != nil { + t.Fatalf("makeNode: %v", err) + } + if err := dagService.Add(ctx, node); err != nil { + t.Fatalf("dagService.Add: %v", err) + } + blockKey := dshelp.MultihashToDsKey(node.Cid().Hash()).String() + + elems, err := d.GetElements() + if err != nil { + t.Fatalf("GetElements: %v", err) + } + if err := s.putElems(ctx, elems, blockKey, d); err != nil { + t.Fatalf("putElems: %v", err) + } + return blockKey +} + +// TestPutTombsEmpty verifies that putTombs with an empty list is a no-op. +func TestPutTombsEmpty(t *testing.T) { + t.Parallel() + ctx := t.Context() + var fired bool + s := newTestSet(t, mdutils.Mock(), setHooks{ + putHook: func(PutEvent) { fired = true }, + deleteHook: func(DeleteEvent) { fired = true }, + }) + + if err := s.putTombs(ctx, nil, nil); err != nil { + t.Fatal(err) + } + if err := s.putTombs(ctx, []*pb.Element{}, nil); err != nil { + t.Fatal(err) + } + if fired { + t.Error("no hooks should fire for an empty tombstone list") + } +} + +// TestPutTombsFullDelete verifies tombstoning the only element for a key: +// the store is cleaned up and the appropriate hook fires. +func TestPutTombsFullDelete(t *testing.T) { + t.Parallel() + + t.Run("store cleanup", func(t *testing.T) { + t.Parallel() + ctx := t.Context() + dag := mdutils.Mock() + s := newTestSet(t, dag, setHooks{}) + id := addElem(t, s, dag, "foo", []byte("hello"), 1) + + if err := s.putTombs(ctx, []*pb.Element{{Key: "foo", Id: id}}, nil); err != nil { + t.Fatal(err) + } + if _, err := s.store.Get(ctx, s.valueKey("foo")); err != ds.ErrNotFound { + t.Errorf("value key should be deleted, got err=%v", err) + } + if _, err := s.store.Get(ctx, s.priorityKey("foo")); err != ds.ErrNotFound { + t.Errorf("priority key should be deleted, got err=%v", err) + } + if inSet, _ := s.InSet(ctx, "foo"); inSet { + t.Error("key should not be in set after full tombstone") + } + }) + + t.Run("deleteHook", func(t *testing.T) { + t.Parallel() + ctx := t.Context() + dag := mdutils.Mock() + var keys []ds.Key + s := newTestSet(t, dag, setHooks{deleteHook: func(e DeleteEvent) { keys = append(keys, e.Key) }}) + id := addElem(t, s, dag, "foo", []byte("hello"), 1) + + if err := s.putTombs(ctx, []*pb.Element{{Key: "foo", Id: id}}, nil); err != nil { + t.Fatal(err) + } + if len(keys) != 1 || keys[0].String() != ds.NewKey("foo").String() { + t.Errorf("deleteHook calls = %v, want [/foo]", keys) + } + }) + + t.Run("deleteHook receives lastVal with hookLoadPreviousValue", func(t *testing.T) { + t.Parallel() + ctx := t.Context() + dag := mdutils.Mock() + var gotKey ds.Key + var gotVal []byte + s := newTestSet(t, dag, setHooks{ + deleteHook: func(e DeleteEvent) { gotKey, gotVal = e.Key, e.LastValue }, + hookLoadPreviousValue: true, + }) + id := addElem(t, s, dag, "foo", []byte("hello"), 1) + + if err := s.putTombs(ctx, []*pb.Element{{Key: "foo", Id: id}}, nil); err != nil { + t.Fatal(err) + } + if gotKey.String() != ds.NewKey("foo").String() { + t.Errorf("deleteHook key = %q, want /foo", gotKey) + } + if string(gotVal) != "hello" { + t.Errorf("deleteHook lastVal = %q, want hello", gotVal) + } + }) +} + +// TestPutTombsNilValue verifies correct hook behavior when a key's stored value +// is nil (e.g. Put(ctx, k, nil)). A nil stored value is distinct from "key not +// found": delete hooks must fire, and deleteHook must receive nil as lastVal. +func TestPutTombsNilValue(t *testing.T) { + t.Parallel() + + t.Run("deleteHook fires for nil value", func(t *testing.T) { + t.Parallel() + ctx := t.Context() + dag := mdutils.Mock() + var deleteFired bool + s := newTestSet(t, dag, setHooks{ + deleteHook: func(DeleteEvent) { deleteFired = true }, + }) + id := addElem(t, s, dag, "foo", nil, 1) + + if err := s.putTombs(ctx, []*pb.Element{{Key: "foo", Id: id}}, nil); err != nil { + t.Fatal(err) + } + if !deleteFired { + t.Error("deleteHook must fire even when the stored value is nil") + } + }) + + t.Run("deleteHook receives nil lastVal", func(t *testing.T) { + t.Parallel() + ctx := t.Context() + dag := mdutils.Mock() + var gotKey ds.Key + var gotVal []byte + var called bool + s := newTestSet(t, dag, setHooks{ + deleteHook: func(e DeleteEvent) { gotKey, gotVal, called = e.Key, e.LastValue, true }, + hookLoadPreviousValue: true, + }) + id := addElem(t, s, dag, "foo", nil, 1) + + if err := s.putTombs(ctx, []*pb.Element{{Key: "foo", Id: id}}, nil); err != nil { + t.Fatal(err) + } + if !called { + t.Fatal("deleteHook must be called") + } + if gotKey.String() != ds.NewKey("foo").String() { + t.Errorf("deleteHook key = %q, want /foo", gotKey) + } + if gotVal != nil { + t.Errorf("deleteHook lastVal = %q, want nil", gotVal) + } + }) + + t.Run("nil survivor keeps key with nil value", func(t *testing.T) { + t.Parallel() + ctx := t.Context() + dag := mdutils.Mock() + var deleteFired bool + var putFired bool + var putOldVal, putNewVal []byte + s := newTestSet(t, dag, setHooks{ + putHook: func(e PutEvent) { + putFired = true + putOldVal = e.OldValue + putNewVal = e.NewValue + }, + deleteHook: func(DeleteEvent) { deleteFired = true }, + hookLoadPreviousValue: true, + }) + // "aaa" wins over nil lexicographically; store value is "aaa". + addElem(t, s, dag, "foo", nil, 1) // loses + id2 := addElem(t, s, dag, "foo", []byte("aaa"), 1) // wins + putFired = false + + // After tombstoning id2, id1 (nil value) is still a live, non-tombstoned + // element — the key must remain in the set with a nil value. + // putTombs distinguishes "no survivors" from "nil-valued survivor" via + // priority (p == 0 iff no survivor), so this takes the partial-tombstone + // path: value is replaced with nil and putHook fires. + if err := s.putTombs(ctx, []*pb.Element{{Key: "foo", Id: id2}}, nil); err != nil { + t.Fatal(err) + } + if deleteFired { + t.Error("deleteHook must not fire; a survivor remains") + } + if !putFired { + t.Fatal("putHook must fire; value changed from aaa to nil") + } + if string(putOldVal) != "aaa" { + t.Errorf("putHook OldValue = %q, want aaa", putOldVal) + } + if putNewVal != nil { + t.Errorf("putHook NewValue = %q, want nil", putNewVal) + } + if inSet, err := s.InSet(ctx, "foo"); err != nil { + t.Fatalf("InSet: %v", err) + } else if !inSet { + t.Error("key should remain in set while nil-valued survivor exists") + } + }) +} + +// TestPutTombsPartialTombstone verifies that tombstoning one of two elements +// leaves the survivor in the store and fires the right hook. +// Two elements at equal priority: "bbb" wins lexicographically over "aaa". +// Tombstoning the "aaa" block leaves "bbb" as the survivor. +func TestPutTombsPartialTombstone(t *testing.T) { + t.Parallel() + + t.Run("surviving value kept", func(t *testing.T) { + t.Parallel() + ctx := t.Context() + dag := mdutils.Mock() + s := newTestSet(t, dag, setHooks{}) + id1 := addElem(t, s, dag, "foo", []byte("aaa"), 1) + addElem(t, s, dag, "foo", []byte("bbb"), 1) + + if err := s.putTombs(ctx, []*pb.Element{{Key: "foo", Id: id1}}, nil); err != nil { + t.Fatal(err) + } + val, err := s.Element(ctx, "foo") + if err != nil { + t.Fatalf("Element: %v", err) + } + if string(val) != "bbb" { + t.Errorf("surviving value = %q, want bbb", val) + } + }) + + t.Run("no hook when value unchanged", func(t *testing.T) { + t.Parallel() + ctx := t.Context() + dag := mdutils.Mock() + var putFired, deleteFired bool + s := newTestSet(t, dag, setHooks{ + putHook: func(PutEvent) { putFired = true }, + deleteHook: func(DeleteEvent) { deleteFired = true }, + hookLoadPreviousValue: true, + }) + id1 := addElem(t, s, dag, "foo", []byte("aaa"), 1) // "aaa" loses + addElem(t, s, dag, "foo", []byte("bbb"), 1) // "bbb" wins + putFired = false + + // Tombstone the loser: "bbb" remains the winner, no observable change. + if err := s.putTombs(ctx, []*pb.Element{{Key: "foo", Id: id1}}, nil); err != nil { + t.Fatal(err) + } + if putFired || deleteFired { + t.Error("no hook should fire when tombstoning a non-winning element") + } + }) + + t.Run("putHook fires not deleteHook", func(t *testing.T) { + t.Parallel() + ctx := t.Context() + dag := mdutils.Mock() + var putVals [][]byte + var deleteFired bool + s := newTestSet(t, dag, setHooks{ + putHook: func(e PutEvent) { putVals = append(putVals, e.NewValue) }, + deleteHook: func(DeleteEvent) { deleteFired = true }, + }) + addElem(t, s, dag, "foo", []byte("aaa"), 1) + id2 := addElem(t, s, dag, "foo", []byte("bbb"), 1) // "bbb" wins, is current value + putVals = nil // discard calls from addElem + + // Tombstone the current winner ("bbb"). "aaa" becomes the new winner, + // so the value changes and putHook must fire. + if err := s.putTombs(ctx, []*pb.Element{{Key: "foo", Id: id2}}, nil); err != nil { + t.Fatal(err) + } + if deleteFired { + t.Error("deleteHook must not fire when a surviving element exists") + } + if len(putVals) != 1 || string(putVals[0]) != "aaa" { + t.Errorf("putHook vals = %q, want [aaa]", putVals) + } + }) + + t.Run("putHook receives newVal and oldVal with hookLoadPreviousValue", func(t *testing.T) { + t.Parallel() + ctx := t.Context() + dag := mdutils.Mock() + type call struct{ newVal, oldVal []byte } + var calls []call + s := newTestSet(t, dag, setHooks{ + putHook: func(e PutEvent) { calls = append(calls, call{e.NewValue, e.OldValue}) }, + hookLoadPreviousValue: true, + }) + addElem(t, s, dag, "foo", []byte("aaa"), 1) + id2 := addElem(t, s, dag, "foo", []byte("bbb"), 1) // "bbb" wins, is current value + calls = nil + + // Tombstone the current winner ("bbb"). "aaa" becomes the new winner: + // putHook must fire with newVal="aaa", oldVal="bbb". + if err := s.putTombs(ctx, []*pb.Element{{Key: "foo", Id: id2}}, nil); err != nil { + t.Fatal(err) + } + if len(calls) != 1 { + t.Fatalf("expected 1 putHook call, got %d", len(calls)) + } + if string(calls[0].newVal) != "aaa" { + t.Errorf("putHook newVal = %q, want aaa", calls[0].newVal) + } + if string(calls[0].oldVal) != "bbb" { + t.Errorf("putHook oldVal = %q, want bbb", calls[0].oldVal) + } + }) +} + +// TestPutTombsMultipleKeys verifies that a single putTombs call handles +// tombstones for multiple keys independently. +func TestPutTombsMultipleKeys(t *testing.T) { + t.Parallel() + ctx := t.Context() + dag := mdutils.Mock() + deleted := make(map[string][]byte) + var putCalled bool + s := newTestSet(t, dag, setHooks{ + deleteHook: func(e DeleteEvent) { deleted[e.Key.String()] = e.LastValue }, + putHook: func(PutEvent) { putCalled = true }, + hookLoadPreviousValue: true, + }) + id1 := addElem(t, s, dag, "key1", []byte("val1"), 1) + id2 := addElem(t, s, dag, "key2", []byte("val2"), 1) + putCalled = false // discard calls from addElem setup + + tombs := []*pb.Element{{Key: "key1", Id: id1}, {Key: "key2", Id: id2}} + if err := s.putTombs(ctx, tombs, nil); err != nil { + t.Fatal(err) + } + if len(deleted) != 2 { + t.Fatalf("expected 2 deleteHook calls, got %d", len(deleted)) + } + if string(deleted[ds.NewKey("key1").String()]) != "val1" { + t.Errorf("key1 lastVal = %q, want val1", deleted[ds.NewKey("key1").String()]) + } + if string(deleted[ds.NewKey("key2").String()]) != "val2" { + t.Errorf("key2 lastVal = %q, want val2", deleted[ds.NewKey("key2").String()]) + } + if putCalled { + t.Error("putHook must not fire for full-delete tombstones") + } +} + +// TestPutTombsNonExistentKey verifies that tombstoning a block ID for a key +// that was never PUT does not crash: the tomb is recorded, and when +// hookLoadPreviousValue is set, no hooks fire since there was no prior value to +// report. +func TestPutTombsNonExistentKey(t *testing.T) { + t.Parallel() + ctx := t.Context() + var putFired, deleteFired bool + s := newTestSet(t, mdutils.Mock(), setHooks{ + putHook: func(PutEvent) { putFired = true }, + deleteHook: func(DeleteEvent) { deleteFired = true }, + hookLoadPreviousValue: true, + }) + + if err := s.putTombs(ctx, []*pb.Element{{Key: "ghost", Id: "fakeid"}}, nil); err != nil { + t.Fatalf("putTombs: %v", err) + } + inTomb, err := s.inTombsKeyID(ctx, "ghost", "fakeid") + if err != nil { + t.Fatalf("inTombsKeyID: %v", err) + } + if !inTomb { + t.Error("tomb entry should be written even for a key never PUT") + } + if inSet, _ := s.InSet(ctx, "ghost"); inSet { + t.Error("key must not be in set") + } + if putFired { + t.Error("putHook must not fire for a key that was never PUT") + } + if deleteFired { + t.Error("deleteHook must not fire when key had no prior value") + } +} + +// TestPutTombsPrevValsFirstEncounterOnly verifies that prevVals is captured +// before any write for a key: deleteHook receives the pre-call value even +// when multiple tombs for the same key appear in one delta. +func TestPutTombsPrevValsFirstEncounterOnly(t *testing.T) { + t.Parallel() + ctx := t.Context() + dag := mdutils.Mock() + var putFired bool + var lastVals [][]byte + s := newTestSet(t, dag, setHooks{ + putHook: func(PutEvent) { putFired = true }, + deleteHook: func(e DeleteEvent) { lastVals = append(lastVals, e.LastValue) }, + hookLoadPreviousValue: true, + }) + id1 := addElem(t, s, dag, "foo", []byte("first"), 1) + id2 := addElem(t, s, dag, "foo", []byte("second"), 2) // wins (higher prio) + putFired = false + + tombs := []*pb.Element{{Key: "foo", Id: id1}, {Key: "foo", Id: id2}} + if err := s.putTombs(ctx, tombs, nil); err != nil { + t.Fatal(err) + } + if putFired { + t.Error("putHook must not fire when tombstoning a key with a surviving element") + } + if len(lastVals) != 1 { + t.Fatalf("expected 1 deleteHook call, got %d", len(lastVals)) + } + if string(lastVals[0]) != "second" { + t.Errorf("deleteHook lastVal = %q, want second", lastVals[0]) + } +} + +// TestPutTombsIdempotent verifies that applying the same tombstone twice +// leaves the key absent from the set. +func TestPutTombsIdempotent(t *testing.T) { + t.Parallel() + ctx := t.Context() + dag := mdutils.Mock() + var deleteFiredCount uint8 + s := newTestSet(t, dag, setHooks{ + deleteHook: func(DeleteEvent) { deleteFiredCount++ }, + hookLoadPreviousValue: true, + }) + id := addElem(t, s, dag, "foo", []byte("hello"), 1) + tombs := []*pb.Element{{Key: "foo", Id: id}} + + for range 5 { + if err := s.putTombs(ctx, tombs, nil); err != nil { + t.Fatal(err) + } + } + if inSet, _ := s.InSet(ctx, "foo"); inSet { + t.Error("key should not be in set after re-tombstoning") + } + if deleteFiredCount != 1 { + t.Errorf("deleteHook should fire only once for the same tombstone, got %d", deleteFiredCount) + } +} + +// TestPutElemsDeltaForwarded verifies that the Delta passed to putElems is +// forwarded identically to the putHook. +func TestPutElemsDeltaForwarded(t *testing.T) { + t.Parallel() + ctx := t.Context() + var gotDelta Delta + s := newTestSet(t, mdutils.Mock(), setHooks{ + putHook: func(e PutEvent) { gotDelta = e.Delta }, + }) + + var prio uint64 = 42 + d := &pbDelta{Delta: &pb.Delta{DagName: "dag-test", Priority: prio}} + if err := s.putElems(ctx, []*pb.Element{{Key: "foo", Value: []byte("v")}}, "block1", d); err != nil { + t.Fatal(err) + } + if gotDelta != d { + t.Fatalf("putHook delta pointer mismatch: got %p want %p", gotDelta, d) + } + if gotDelta.GetPriority() != prio { + t.Errorf("forwarded delta priority = %d, want %d", gotDelta.GetPriority(), prio) + } + if gotDelta.GetDagName() != "dag-test" { + t.Errorf("forwarded delta dagName = %q, want dag-test", gotDelta.GetDagName()) + } +} + +// TestPutTombsFullDeleteDeltaForwarded verifies that a tombstone-only delta is +// forwarded to the deleteHook when a key is fully deleted. +func TestPutTombsFullDeleteDeltaForwarded(t *testing.T) { + t.Parallel() + ctx := t.Context() + dag := mdutils.Mock() + var gotDelta Delta + s := newTestSet(t, dag, setHooks{ + deleteHook: func(e DeleteEvent) { gotDelta = e.Delta }, + }) + id := addElem(t, s, dag, "foo", []byte("hello"), 1) + + tombDelta := &pbDelta{Delta: &pb.Delta{DagName: "tomb-dag"}} + tombs := []*pb.Element{{Key: "foo", Id: id}} + if err := s.putTombs(ctx, tombs, tombDelta); err != nil { + t.Fatal(err) + } + if gotDelta != tombDelta { + t.Fatalf("deleteHook delta pointer mismatch: got %p want %p", gotDelta, tombDelta) + } + if gotDelta.GetDagName() != "tomb-dag" { + t.Errorf("forwarded delta dagName = %q, want tomb-dag", gotDelta.GetDagName()) + } +} + +// TestPutTombsPartialPutDeltaForwarded verifies that when a tombstone removes +// the current winner and a surviving element takes over, the putHook +// receives the tombstone delta (the one that triggered the MV change), not +// the original put delta. +func TestPutTombsPartialPutDeltaForwarded(t *testing.T) { + t.Parallel() + ctx := t.Context() + dag := mdutils.Mock() + var gotEvent PutEvent + // Only capture the hook call triggered by putTombs; ignore the addElem puts. + var capture bool + s := newTestSet(t, dag, setHooks{ + putHook: func(e PutEvent) { + if capture { + gotEvent = e + } + }, + hookLoadPreviousValue: true, + }) + // Seed two elements; "aaa" at prio 2 is the current winner. + addElem(t, s, dag, "foo", []byte("zzz"), 1) + id2 := addElem(t, s, dag, "foo", []byte("aaa"), 2) + + capture = true + tombDelta := &pbDelta{Delta: &pb.Delta{DagName: "tomb-dag", Priority: 3}} + // Tombstone the winner so "zzz" takes over — that changes the MV and fires putHook. + if err := s.putTombs(ctx, []*pb.Element{{Key: "foo", Id: id2}}, tombDelta); err != nil { + t.Fatal(err) + } + if gotEvent.Delta != tombDelta { + t.Fatalf("partial-put putHook delta should be the tombstone delta: got %p want %p", gotEvent.Delta, tombDelta) + } + if string(gotEvent.NewValue) != "zzz" || string(gotEvent.OldValue) != "aaa" { + t.Errorf("putHook values = (new=%q, old=%q), want (new=zzz, old=aaa)", gotEvent.NewValue, gotEvent.OldValue) + } +} + +// TestPutElemsPrioritiesReported verifies that PutEvent.NewPriority matches +// the delta priority for a normal put and PutEvent.OldPriority carries the +// replaced value's priority when HookLoadPreviousValue is set. +func TestPutElemsPrioritiesReported(t *testing.T) { + t.Parallel() + ctx := t.Context() + dag := mdutils.Mock() + var gotEvent PutEvent + capture := false + s := newTestSet(t, dag, setHooks{ + putHook: func(e PutEvent) { + if capture { + gotEvent = e + } + }, + hookLoadPreviousValue: true, + }) + var oldPrio uint64 = 3 + addElem(t, s, dag, "foo", []byte("old"), oldPrio) + + capture = true + var newPrio uint64 = 7 + d := &pbDelta{Delta: &pb.Delta{Priority: newPrio}} + if err := s.putElems(ctx, []*pb.Element{{Key: "foo", Value: []byte("new")}}, "block-new", d); err != nil { + t.Fatal(err) + } + if gotEvent.NewPriority != newPrio { + t.Errorf("NewPriority = %d, want %d (delta priority)", gotEvent.NewPriority, newPrio) + } + if gotEvent.OldPriority != oldPrio { + t.Errorf("OldPriority = %d, want %d (prior value's priority)", gotEvent.OldPriority, oldPrio) + } +} + +// TestPutElemsOldPriorityZeroWhenNotLoaded verifies that OldPriority is 0 +// when HookLoadPreviousValue is false, matching the OldValue behavior. +func TestPutElemsOldPriorityZeroWhenNotLoaded(t *testing.T) { + t.Parallel() + ctx := t.Context() + dag := mdutils.Mock() + var gotEvent PutEvent + capture := false + s := newTestSet(t, dag, setHooks{ + putHook: func(e PutEvent) { + if capture { + gotEvent = e + } + }, + }) + addElem(t, s, dag, "foo", []byte("old"), 3) + + capture = true + var newPrio uint64 = 7 + d := &pbDelta{Delta: &pb.Delta{Priority: newPrio}} + if err := s.putElems(ctx, []*pb.Element{{Key: "foo", Value: []byte("new")}}, "block-new", d); err != nil { + t.Fatal(err) + } + if gotEvent.NewPriority != newPrio { + t.Errorf("NewPriority = %d, want %d", gotEvent.NewPriority, newPrio) + } + if gotEvent.OldPriority != 0 { + t.Errorf("OldPriority = %d, want 0 (HookLoadPreviousValue is false)", gotEvent.OldPriority) + } +} + +// TestPutTombsPartialPutPriorities verifies that for a partial-tombstone put +// NewPriority carries the surviving element's priority (not the tombstone +// delta's priority) and OldPriority carries the previous winner's priority. +// This is the case where NewPriority cannot be inferred from Delta. +func TestPutTombsPartialPutPriorities(t *testing.T) { + t.Parallel() + ctx := t.Context() + dag := mdutils.Mock() + var gotEvent PutEvent + capture := false + s := newTestSet(t, dag, setHooks{ + putHook: func(e PutEvent) { + if capture { + gotEvent = e + } + }, + hookLoadPreviousValue: true, + }) + // Seed two elements; "aaa" at prio 2 is the current winner. + addElem(t, s, dag, "foo", []byte("zzz"), 1) + id2 := addElem(t, s, dag, "foo", []byte("aaa"), 2) + + capture = true + // Tombstone delta priority (5) is deliberately different from the + // surviving element priority (1) to prove they are distinct. + tombDelta := &pbDelta{Delta: &pb.Delta{Priority: 5}} + if err := s.putTombs(ctx, []*pb.Element{{Key: "foo", Id: id2}}, tombDelta); err != nil { + t.Fatal(err) + } + if gotEvent.NewPriority != 1 { + t.Errorf("NewPriority = %d, want 1 (surviving element's priority, not the tombstone delta's)", gotEvent.NewPriority) + } + if gotEvent.OldPriority != 2 { + t.Errorf("OldPriority = %d, want 2 (tombstoned winner's priority)", gotEvent.OldPriority) + } +} + +// TestPutTombsPartialPutSameValueDifferentPriority verifies that when a +// tombstone removes the current winner and a surviving element with the SAME +// value bytes but a different priority takes over, the putHook still fires — +// the priority change is a state transition that must be surfaced even though +// the value bytes are unchanged. +func TestPutTombsPartialPutSameValueDifferentPriority(t *testing.T) { + t.Parallel() + ctx := t.Context() + dag := mdutils.Mock() + var gotEvent PutEvent + var fired bool + s := newTestSet(t, dag, setHooks{ + putHook: func(e PutEvent) { gotEvent = e; fired = true }, + hookLoadPreviousValue: true, + }) + // Two elements with the SAME value at different priorities. The higher + // priority (id2) currently wins. Tombstoning id2 leaves id1 — identical + // value, but lower priority. + addElem(t, s, dag, "foo", []byte("same"), 1) + id2 := addElem(t, s, dag, "foo", []byte("same"), 2) + fired = false // reset: the second addElem fires putHook + + if err := s.putTombs(ctx, []*pb.Element{{Key: "foo", Id: id2}}, &pbDelta{Delta: &pb.Delta{Priority: 5}}); err != nil { + t.Fatal(err) + } + if !fired { + t.Fatal("putHook must fire on partial tombstone, even when value is unchanged, to surface the priority change") + } + if gotEvent.OldPriority != 2 { + t.Errorf("OldPriority = %d, want 2", gotEvent.OldPriority) + } + if gotEvent.NewPriority != 1 { + t.Errorf("NewPriority = %d, want 1", gotEvent.NewPriority) + } + if !bytes.Equal(gotEvent.OldValue, []byte("same")) { + t.Errorf("OldValue = %q, want same", gotEvent.OldValue) + } + if !bytes.Equal(gotEvent.NewValue, []byte("same")) { + t.Errorf("NewValue = %q, want same", gotEvent.NewValue) + } +} + +// TestPutTombsFullDeleteLastPriority verifies that the DeleteEvent carries +// the priority of the removed value when HookLoadPreviousValue is set. +func TestPutTombsFullDeleteLastPriority(t *testing.T) { + t.Parallel() + ctx := t.Context() + dag := mdutils.Mock() + var gotEvent DeleteEvent + s := newTestSet(t, dag, setHooks{ + deleteHook: func(e DeleteEvent) { gotEvent = e }, + hookLoadPreviousValue: true, + }) + id := addElem(t, s, dag, "foo", []byte("hello"), 4) + + if err := s.putTombs(ctx, []*pb.Element{{Key: "foo", Id: id}}, &pbDelta{Delta: &pb.Delta{Priority: 9}}); err != nil { + t.Fatal(err) + } + if gotEvent.LastPriority != 4 { + t.Errorf("LastPriority = %d, want 4 (deleted value's priority)", gotEvent.LastPriority) + } +} + +// TestCustomDeltaTypeAssert verifies that callbacks can type-assert the Delta +// back to a concrete implementation to reach application-specific fields +func TestCustomDeltaTypeAssert(t *testing.T) { + t.Parallel() + ctx := t.Context() + var gotCutsomField int64 + s := newTestSet(t, mdutils.Mock(), setHooks{ + putHook: func(e PutEvent) { + if cd, ok := e.Delta.(*customDelta); ok { + gotCutsomField = cd.CustomField + } + }, + }) + + d := &customDelta{pbDelta: pbDelta{Delta: &pb.Delta{Priority: 1}}, CustomField: 1713456789} + if err := s.putElems(ctx, []*pb.Element{{Key: "foo", Value: []byte("v")}}, "block1", d); err != nil { + t.Fatal(err) + } + if gotCutsomField != 1713456789 { + t.Errorf("type-asserted custom field = %d, want 1713456789", gotCutsomField) + } +} + +// customDelta embeds pbDelta and adds an application-specific field, mimicking +// how an external application supplies its own DeltaFactory. +type customDelta struct { + pbDelta + CustomField int64 +} + +// TestPutTombsHigherPriorityWins verifies that the higher-priority element +// survives when the lower-priority one is tombstoned. +func TestPutTombsHigherPriorityWins(t *testing.T) { + t.Parallel() + ctx := t.Context() + dag := mdutils.Mock() + s := newTestSet(t, dag, setHooks{}) + id1 := addElem(t, s, dag, "foo", []byte("zzz"), 1) // high lex, low prio + addElem(t, s, dag, "foo", []byte("aaa"), 2) // low lex, high prio + + if err := s.putTombs(ctx, []*pb.Element{{Key: "foo", Id: id1}}, nil); err != nil { + t.Fatal(err) + } + val, err := s.Element(ctx, "foo") + if err != nil { + t.Fatalf("Element: %v", err) + } + if string(val) != "aaa" { + t.Errorf("surviving value = %q, want aaa (high-priority element)", val) + } +} + +// TestCRDTPutElemsDuplicateKeyInSameDelta verifies that when a single delta +// contains multiple elements for the same key, putElems treats the group as +// one logical key transition: +// +// 1. The put hook fires exactly once per key, not once per element. Firing +// per-element makes downstream observers (e.g. usage accountants) count a +// single key insertion N times. +// 2. The winning value follows CRDT conflict resolution across the elements +// — all share the delta's priority, so the lex-largest value wins — +// independent of the elements' order in the slice. +// 3. The PutEvent reports the pre-delta store snapshot as OldValue, not a +// sibling element's value from earlier in the same iteration. +// +// The test mirrors putTombs' per-key deduplication pattern (see +// TestPutTombsMultipleKeys and the newStates/prevStates handling in +// putTombs) and is expected to fail against the current putElems +// implementation, which iterates elements independently and emits one +// PutEvent per element. +func TestCRDTPutElemsDuplicateKeyInSameDelta(t *testing.T) { + t.Parallel() + + t.Run("hook fires once per key", func(t *testing.T) { + t.Parallel() + ctx := t.Context() + var events []PutEvent + s := newTestSet(t, mdutils.Mock(), setHooks{ + putHook: func(e PutEvent) { events = append(events, e) }, + hookLoadPreviousValue: true, + }) + + d := &pbDelta{Delta: &pb.Delta{Priority: 1}} + elems := []*pb.Element{ + {Key: "foo", Value: []byte("zzz")}, + {Key: "foo", Value: []byte("aaa")}, + } + if err := s.putElems(ctx, elems, "block-multi", d); err != nil { + t.Fatal(err) + } + if len(events) != 1 { + t.Fatalf("putHook call count = %d, want 1 (one hook per key, not per element)", len(events)) + } + }) + + t.Run("winning value is lex-largest regardless of order", func(t *testing.T) { + t.Parallel() + + // Element ordering within the delta must not affect the outcome: + // conflict resolution across same-key, same-priority elements picks + // the lex-largest value. + for _, tc := range []struct { + name string + elems []*pb.Element + }{ + { + name: "ascending (aaa then zzz)", + elems: []*pb.Element{ + {Key: "foo", Value: []byte("aaa")}, + {Key: "foo", Value: []byte("zzz")}, + }, + }, + { + name: "descending (zzz then aaa)", + elems: []*pb.Element{ + {Key: "foo", Value: []byte("zzz")}, + {Key: "foo", Value: []byte("aaa")}, + }, + }, + } { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + ctx := t.Context() + var events []PutEvent + s := newTestSet(t, mdutils.Mock(), setHooks{ + putHook: func(e PutEvent) { events = append(events, e) }, + hookLoadPreviousValue: true, + }) + + d := &pbDelta{Delta: &pb.Delta{Priority: 1}} + if err := s.putElems(ctx, tc.elems, "block-multi", d); err != nil { + t.Fatal(err) + } + if len(events) != 1 { + t.Fatalf("putHook call count = %d, want 1", len(events)) + } + if !bytes.Equal(events[0].NewValue, []byte("zzz")) { + t.Errorf("NewValue = %q, want zzz (lex-largest at tied priority)", events[0].NewValue) + } + if events[0].OldValue != nil { + t.Errorf("OldValue = %q, want nil (key did not exist pre-delta)", events[0].OldValue) + } + val, err := s.Element(ctx, "foo") + if err != nil { + t.Fatalf("Element: %v", err) + } + if !bytes.Equal(val, []byte("zzz")) { + t.Errorf("stored value = %q, want zzz (CRDT winner must not depend on element order in delta)", val) + } + }) + } + }) + + t.Run("OldValue is pre-delta snapshot, not a sibling in-delta value", func(t *testing.T) { + t.Parallel() + ctx := t.Context() + dag := mdutils.Mock() + var events []PutEvent + capture := false + s := newTestSet(t, dag, setHooks{ + putHook: func(e PutEvent) { + if capture { + events = append(events, e) + } + }, + hookLoadPreviousValue: true, + }) + + // Seed the store so the key has a committed pre-delta value. + addElem(t, s, dag, "foo", []byte("seed"), 1) + + capture = true + d := &pbDelta{Delta: &pb.Delta{Priority: 2}} + elems := []*pb.Element{ + {Key: "foo", Value: []byte("a")}, + {Key: "foo", Value: []byte("z")}, // in-delta winner at equal priority + } + if err := s.putElems(ctx, elems, "block-multi", d); err != nil { + t.Fatal(err) + } + if len(events) != 1 { + t.Fatalf("putHook call count = %d, want 1", len(events)) + } + if !bytes.Equal(events[0].OldValue, []byte("seed")) { + t.Errorf("OldValue = %q, want seed (pre-delta committed value)", events[0].OldValue) + } + if !bytes.Equal(events[0].NewValue, []byte("z")) { + t.Errorf("NewValue = %q, want z (in-delta lex-largest)", events[0].NewValue) + } + if events[0].OldPriority != 1 { + t.Errorf("OldPriority = %d, want 1 (seed's priority)", events[0].OldPriority) + } + if events[0].NewPriority != 2 { + t.Errorf("NewPriority = %d, want 2 (delta priority)", events[0].NewPriority) + } + }) +} + +// TestPurgeKeyBlocksDeltaNil verifies that hooks fired from the purge path +// receive a nil Delta, since no originating delta triggered the state change. +func TestPurgeKeyBlocksDeltaNil(t *testing.T) { + t.Parallel() + + t.Run("full purge fires deleteHook with nil delta", func(t *testing.T) { + t.Parallel() + ctx := t.Context() + dag := mdutils.Mock() + var gotEvent DeleteEvent + var fired bool + s := newTestSet(t, dag, setHooks{ + deleteHook: func(e DeleteEvent) { gotEvent = e; fired = true }, + hookLoadPreviousValue: true, + }) + id := addElem(t, s, dag, "foo", []byte("hello"), 1) + + c := blockIDToCid(t, id) + if err := s.purgeKeyBlocks(ctx, "foo", map[cid.Cid]struct{}{c: {}}, true, false); err != nil { + t.Fatal(err) + } + if !fired { + t.Fatal("deleteHook should fire for full purge") + } + if gotEvent.Delta != nil { + t.Errorf("deleteHook delta = %v, want nil (no originating delta on purge path)", gotEvent.Delta) + } + if string(gotEvent.LastValue) != "hello" { + t.Errorf("deleteHook lastValue = %q, want hello", gotEvent.LastValue) + } + }) + + t.Run("partial purge fires putHook with nil delta", func(t *testing.T) { + t.Parallel() + ctx := t.Context() + dag := mdutils.Mock() + var gotEvent PutEvent + var fired bool + s := newTestSet(t, dag, setHooks{ + putHook: func(e PutEvent) { gotEvent = e; fired = true }, + }) + id1 := addElem(t, s, dag, "foo", []byte("zzz"), 1) + addElem(t, s, dag, "foo", []byte("aaa"), 2) + + c := blockIDToCid(t, id1) + if err := s.purgeKeyBlocks(ctx, "foo", map[cid.Cid]struct{}{c: {}}, true, false); err != nil { + t.Fatal(err) + } + if !fired { + t.Fatal("putHook should fire when partial purge changes the winner") + } + if gotEvent.Delta != nil { + t.Errorf("putHook delta = %v, want nil (no originating delta on purge path)", gotEvent.Delta) + } + }) +} + +// TestPurgeKeyBlocksPriorities verifies that hooks fired from the purge path +// carry the correct old/new priorities when HookLoadPreviousValue is set. +func TestPurgeKeyBlocksPriorities(t *testing.T) { + t.Parallel() + + t.Run("full purge reports LastPriority", func(t *testing.T) { + t.Parallel() + ctx := t.Context() + dag := mdutils.Mock() + var gotEvent DeleteEvent + s := newTestSet(t, dag, setHooks{ + deleteHook: func(e DeleteEvent) { gotEvent = e }, + hookLoadPreviousValue: true, + }) + id := addElem(t, s, dag, "foo", []byte("hello"), 6) + + c := blockIDToCid(t, id) + if err := s.purgeKeyBlocks(ctx, "foo", map[cid.Cid]struct{}{c: {}}, true, false); err != nil { + t.Fatal(err) + } + if gotEvent.LastPriority != 6 { + t.Errorf("LastPriority = %d, want 6", gotEvent.LastPriority) + } + }) + + t.Run("partial purge reports old and new priorities", func(t *testing.T) { + t.Parallel() + ctx := t.Context() + dag := mdutils.Mock() + var gotEvent PutEvent + s := newTestSet(t, dag, setHooks{ + putHook: func(e PutEvent) { gotEvent = e }, + hookLoadPreviousValue: true, + }) + addElem(t, s, dag, "foo", []byte("zzz"), 1) + id2 := addElem(t, s, dag, "foo", []byte("aaa"), 2) // current winner + + c2 := blockIDToCid(t, id2) + if err := s.purgeKeyBlocks(ctx, "foo", map[cid.Cid]struct{}{c2: {}}, true, false); err != nil { + t.Fatal(err) + } + if gotEvent.OldPriority != 2 { + t.Errorf("OldPriority = %d, want 2 (purged winner's priority)", gotEvent.OldPriority) + } + if gotEvent.NewPriority != 1 { + t.Errorf("NewPriority = %d, want 1 (surviving element's priority)", gotEvent.NewPriority) + } + }) +} + +// TestPurgeKeyBlocksSuppression covers the two edge cases where purgeKeyBlocks +// needs to make a decision about whether to fire a hook: +// 1. full purge on a key that had no prior value → deleteHook must NOT fire +// (nothing to notify — there was no value to delete) +// 2. partial purge where the surviving value equals the pre-purge value → +// putHook must STILL fire, because a partial purge always replaces the +// winning element and therefore changes the priority +func TestPurgeKeyBlocksSuppression(t *testing.T) { + t.Parallel() + + t.Run("full purge on key with no prior value suppresses deleteHook", func(t *testing.T) { + t.Parallel() + ctx := t.Context() + dag := mdutils.Mock() + var deleteFired bool + s := newTestSet(t, dag, setHooks{ + deleteHook: func(DeleteEvent) { deleteFired = true }, + hookLoadPreviousValue: true, + }) + // Create a real block for an unrelated key so we have a valid CID, but + // never insert an element under "ghost": the value key for "ghost" is + // absent, so hadPrior must be false. + blockID := addElem(t, s, dag, "other", []byte("x"), 1) + c := blockIDToCid(t, blockID) + + if err := s.purgeKeyBlocks(ctx, "ghost", map[cid.Cid]struct{}{c: {}}, true, false); err != nil { + t.Fatal(err) + } + if deleteFired { + t.Error("deleteHook must not fire when purged key had no prior value") + } + }) + + t.Run("partial purge with unchanged value still fires putHook (priority changed)", func(t *testing.T) { + t.Parallel() + ctx := t.Context() + dag := mdutils.Mock() + var gotEvent PutEvent + var fired bool + s := newTestSet(t, dag, setHooks{ + putHook: func(e PutEvent) { gotEvent = e; fired = true }, + hookLoadPreviousValue: true, + }) + // Two elements with the SAME value at different priorities. The higher + // priority (id2) currently wins. Purging id2 leaves id1 — identical + // value, but lower priority. The priority change IS a state transition + // and must be surfaced via the put hook. + addElem(t, s, dag, "foo", []byte("same"), 1) + id2 := addElem(t, s, dag, "foo", []byte("same"), 2) + fired = false // reset: the second addElem fires putHook + + c := blockIDToCid(t, id2) + if err := s.purgeKeyBlocks(ctx, "foo", map[cid.Cid]struct{}{c: {}}, true, false); err != nil { + t.Fatal(err) + } + if !fired { + t.Fatal("putHook must fire on partial purge, even when value is unchanged, to surface the priority change") + } + if gotEvent.OldPriority != 2 { + t.Errorf("OldPriority = %d, want 2", gotEvent.OldPriority) + } + if gotEvent.NewPriority != 1 { + t.Errorf("NewPriority = %d, want 1", gotEvent.NewPriority) + } + }) +} + +// blockIDToCid converts the blockKey returned by addElem (a datastore-keyed +// multihash) back into a CID, for use with purgeKeyBlocks. +func blockIDToCid(t *testing.T, blockID string) cid.Cid { + t.Helper() + mhash, err := dshelp.DsKeyToMultihash(ds.NewKey(blockID)) + if err != nil { + t.Fatalf("DsKeyToMultihash: %v", err) + } + return cid.NewCidV1(cid.DagProtobuf, mhash) +}