Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 99 additions & 36 deletions crdt.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,52 @@ type SessionDAGService interface {
Session(context.Context) ipld.NodeGetter
}

// PutEvent describes a materialised-view update delivered to PutHook.
type PutEvent struct {
// Key whose value was updated.
Key ds.Key
// OldValue is the previous value for the key, if any. It is populated only
// when HookLoadPreviousValue is true, otherwise it is nil.
OldValue []byte
// NewValue is the new value for the key after the update.
NewValue []byte
// OldPriority is the priority of OldValue. It is populated only when
// HookLoadPreviousValue is true, otherwise it is 0. It is also 0 when the
// key had no prior value (real deltas always have priority >= 1, so a zero
// OldPriority means "no previous value"); in that case OldValue is nil as
// well.
OldPriority uint64
// NewPriority is the priority of NewValue. For normal puts this matches
// Delta.GetPriority(). For partial-tombstone puts (where a tombstone removed
// the previous winner and a surviving element took over), NewPriority is the
// surviving element's priority, which differs from the tombstone delta's
// priority.
NewPriority uint64
// Delta is the triggering delta: the one whose merge caused the state
// change. For partial-tombstone puts (where a tombstone removed the previous
// winner and a surviving element took over), Delta is the tombstone delta,
// not the older delta that originally wrote the surviving value. Delta is
// nil when the update did not originate from a merged delta (e.g. PurgeDAG).
// Consumers must not retain Delta past the callback or mutate it.
Delta Delta
}

// DeleteEvent describes a materialised-view removal delivered to DeleteHook.
type DeleteEvent struct {
// Key whose value was updated.
Key ds.Key
// LastValue is the previous value for the key, if any. It is populated only
// when HookLoadPreviousValue is true, otherwise it is nil.
LastValue []byte
// LastPriority is the priority of LastValue. It is populated only when
// HookLoadPreviousValue is true, otherwise it is 0.
LastPriority uint64
// Delta is the tombstone delta that triggered the removal, or nil when the
// removal did not originate from a merged delta (e.g. PurgeDAG). Consumers
// must not retain Delta past the callback or mutate it.
Delta Delta
}

// Options holds configurable values for Datastore.
type Options struct {
Logger logging.StandardLogger
Expand All @@ -85,19 +131,45 @@ type Options struct {
// been already broadcasted by other replicas in the
// interval. Default: 1m.
RebroadcastInterval time.Duration
// The PutHook function is triggered whenever an element
// is successfully added to the datastore (either by a local
// or remote update), and only when that addition is considered the
// prevalent value. Default: nil.
PutHook func(k ds.Key, v []byte)
// The DeleteHook function is triggered whenever a version of an
// element is successfully removed from the datastore (either by a
// local or remote update). Unordered and concurrent updates may
// result in the DeleteHook being triggered even though the element is
// still present in the datastore because it was re-added or not fully
// tombstoned. If that is relevant, use Has() to check if the removed
// element is still part of the datastore. Default: nil.
DeleteHook func(k ds.Key)
// PutHook is triggered whenever the materialised view for a key is
// updated (either by a local or remote update). It fires once per key
// per merged delta, even when the delta carries multiple elements for
// the same key — in that case the lex-largest value wins at the shared
// delta priority and only the winner is reported. For partial-tombstone
// puts (where a tombstone removed the previous winner and a surviving
// element took over), the hook fires with the surviving value.
// Default: nil.
//
// PutEvent.OldValue and PutEvent.OldPriority are populated only when
// HookLoadPreviousValue is true, otherwise they are zero-valued. When
// HookLoadPreviousValue is true, the hook is not fired for partial
// tombstones where the winning element is unchanged (same value and
// priority); a same-value/different-priority swap still fires the hook.
//
// The callback is invoked while internal locks are held; it must not
// call back into the Datastore or it will deadlock.
PutHook func(PutEvent)
// DeleteHook is triggered whenever an element is fully removed from the
// datastore (either by a local or remote update), i.e. when no surviving
// element exists for the key after tombstone processing. Default: nil.
//
// DeleteEvent.LastValue and DeleteEvent.LastPriority are populated only when
// HookLoadPreviousValue is true, otherwise they are zero-valued. When
// HookLoadPreviousValue is true, the hook is not fired for tombstones
// targeting keys that had no prior value in the datastore.
//
// The callback is invoked while internal locks are held; it must not
// call back into the Datastore or it will deadlock.
DeleteHook func(DeleteEvent)
// HookLoadPreviousValue controls whether PutHook/DeleteHook receive the
// previous value and priority for the key. When true, the datastore is read
// before each hook-relevant write so that PutEvent.OldValue /
// PutEvent.OldPriority and DeleteEvent.LastValue / DeleteEvent.LastPriority
// can be populated; this incurs one extra read per triggered hook. When
// true, the hook is also skipped when the observed value would not actually
// change (see PutHook/DeleteHook doc).
// Default: false.
HookLoadPreviousValue bool
// NumWorkers specifies the number of workers ready to
// retrieve and merge deltas while walking the DAGs. Default:
// 5.
Expand Down Expand Up @@ -183,12 +255,13 @@ func (opts *Options) verify() error {
// DefaultOptions initializes an Options object with sensible defaults.
func DefaultOptions() *Options {
return &Options{
Logger: logging.Logger("crdt"),
RebroadcastInterval: time.Minute,
PutHook: nil,
DeleteHook: nil,
NumWorkers: 5,
DAGSyncerTimeout: 5 * time.Minute,
Logger: logging.Logger("crdt"),
RebroadcastInterval: time.Minute,
PutHook: nil,
DeleteHook: nil,
HookLoadPreviousValue: false,
NumWorkers: 5,
DAGSyncerTimeout: 5 * time.Minute,
// always keeping
// https://github.com/libp2p/go-libp2p-core/blob/master/network/network.go#L23
// in sight
Expand Down Expand Up @@ -292,6 +365,12 @@ func New(
return nil, err
}

hooks := setHooks{
putHook: opts.PutHook,
deleteHook: opts.DeleteHook,
hookLoadPreviousValue: opts.HookLoadPreviousValue,
}

// <namespace>/set
fullSetNs := namespace.ChildString(opts.crdtOpts.Namespaces.Set)
// <namespace>/heads
Expand All @@ -300,24 +379,8 @@ func New(
// <namespace>/heads
fullDagHeadsNs := namespace.ChildString(opts.crdtOpts.Namespaces.DAGHeads)

setPutHook := func(k string, v []byte) {
if opts.PutHook == nil {
return
}
dsk := ds.NewKey(k)
opts.PutHook(dsk, v)
}

setDeleteHook := func(k string) {
if opts.DeleteHook == nil {
return
}
dsk := ds.NewKey(k)
opts.DeleteHook(dsk)
}

ctx, cancel := context.WithCancel(context.Background())
set, err := newCRDTSet(ctx, store, fullSetNs, dagSyncer, opts.Logger, setPutHook, setDeleteHook, opts.crdtOpts.DeltaFactory)
set, err := newCRDTSet(ctx, store, fullSetNs, dagSyncer, opts.Logger, hooks, opts.crdtOpts.DeltaFactory)
if err != nil {
cancel()
return nil, fmt.Errorf("error setting up crdt set: %w", err)
Expand Down
201 changes: 173 additions & 28 deletions crdt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"sync"
"sync/atomic"
"testing"
"testing/synctest"
"time"

blockstore "github.com/ipfs/boxo/blockstore"
Expand Down Expand Up @@ -566,39 +567,183 @@ func TestCRDTPrintDAG(t *testing.T) {
}

func TestCRDTHooks(t *testing.T) {
ctx := context.Background()
synctest.Test(t, func(t *testing.T) {
ctx := t.Context()
var put int64
var deleted int64

opts := DefaultOptions()
opts.PutHook = func(PutEvent) {
atomic.AddInt64(&put, 1)
}
opts.DeleteHook = func(DeleteEvent) {
atomic.AddInt64(&deleted, 1)
}

var put int64
var deleted int64
replicas, closeReplicas := makeReplicas(t, opts)
t.Cleanup(closeReplicas)

opts := DefaultOptions()
opts.PutHook = func(k ds.Key, v []byte) {
atomic.AddInt64(&put, 1)
}
opts.DeleteHook = func(k ds.Key) {
atomic.AddInt64(&deleted, 1)
}
k := ds.RandomKey()
if err := replicas[0].Put(ctx, k, nil); err != nil {
t.Fatal(err)
}
if err := replicas[0].Delete(ctx, k); err != nil {
t.Fatal(err)
}
synctest.Wait()
if atomic.LoadInt64(&put) != int64(len(replicas)) {
t.Error("all replicas should have notified Put", put)
}
if atomic.LoadInt64(&deleted) != int64(len(replicas)) {
t.Error("all replicas should have notified Remove", deleted)
}
})
}

replicas, closeReplicas := makeReplicas(t, opts)
defer closeReplicas()
// TestCRDTPutHookLoadsPreviousValue verifies that PutHook receives OldValue
// populated when HookLoadPreviousValue is enabled.
func TestCRDTPutHookLoadsPreviousValue(t *testing.T) {
t.Parallel()
synctest.Test(t, func(t *testing.T) {
type hookCall struct {
key ds.Key
newVal []byte
oldVal []byte
prio uint64
}
var calls []hookCall
var mu sync.Mutex

opts := DefaultOptions()
opts.HookLoadPreviousValue = true
opts.PutHook = func(e PutEvent) {
mu.Lock()
var prio uint64
if e.Delta != nil {
prio = e.Delta.GetPriority()
}
calls = append(calls, hookCall{e.Key, e.NewValue, e.OldValue, prio})
mu.Unlock()
}

k := ds.RandomKey()
err := replicas[0].Put(ctx, k, nil)
if err != nil {
t.Fatal(err)
}
replicas, closeReplicas := makeReplicas(t, opts)
t.Cleanup(closeReplicas)

err = replicas[0].Delete(ctx, k)
if err != nil {
t.Fatal(err)
}
time.Sleep(100 * time.Millisecond)
if atomic.LoadInt64(&put) != int64(len(replicas)) {
t.Error("all replicas should have notified Put", put)
}
if atomic.LoadInt64(&deleted) != int64(len(replicas)) {
t.Error("all replicas should have notified Remove", deleted)
}
k := ds.NewKey("/testkey")
ctx := t.Context()

// First put: all replicas should fire PutHook with newVal="first", oldVal=nil.
if err := replicas[0].Put(ctx, k, []byte("first")); err != nil {
t.Fatal(err)
}
synctest.Wait()

mu.Lock()
if len(calls) != len(replicas) {
t.Errorf("expected %d PutHook calls (one per replica), got %d", len(replicas), len(calls))
}
for i, c := range calls {
if string(c.newVal) != "first" {
t.Errorf("call[%d]: expected newVal %q, got %q", i, "first", c.newVal)
}
if c.oldVal != nil {
t.Errorf("call[%d]: expected oldVal nil on first put, got %q", i, c.oldVal)
}
if c.prio == 0 {
t.Errorf("call[%d]: expected non-zero delta priority, got 0 (delta not forwarded?)", i)
}
}
calls = nil
mu.Unlock()

// Second put: all replicas should fire PutHook with newVal="second", oldVal="first".
if err := replicas[0].Put(ctx, k, []byte("second")); err != nil {
t.Fatal(err)
}
synctest.Wait()

mu.Lock()
defer mu.Unlock()
if len(calls) != len(replicas) {
t.Errorf("expected %d PutHook calls on second put, got %d", len(replicas), len(calls))
}
for i, c := range calls {
if string(c.newVal) != "second" {
t.Errorf("call[%d]: expected newVal %q, got %q", i, "second", c.newVal)
}
if string(c.oldVal) != "first" {
t.Errorf("call[%d]: expected oldVal %q, got %q", i, "first", c.oldVal)
}
}
})
}

// TestCRDTDeleteHookLoadsPreviousValue verifies that DeleteHook receives
// LastValue populated when HookLoadPreviousValue is enabled, and is suppressed for
// tombstones targeting keys with no prior value.
func TestCRDTDeleteHookLoadsPreviousValue(t *testing.T) {
t.Parallel()
synctest.Test(t, func(t *testing.T) {
type hookCall struct {
key ds.Key
lastVal []byte
hasDelta bool
}
var calls []hookCall
var mu sync.Mutex

opts := DefaultOptions()
opts.HookLoadPreviousValue = true
opts.DeleteHook = func(e DeleteEvent) {
mu.Lock()
calls = append(calls, hookCall{e.Key, e.LastValue, e.Delta != nil})
mu.Unlock()
}

replicas, closeReplicas := makeReplicas(t, opts)
t.Cleanup(closeReplicas)

ctx := t.Context()
k := ds.NewKey("/testkey")
if err := replicas[0].Put(ctx, k, []byte("hello")); err != nil {
t.Fatal(err)
}
synctest.Wait()

if err := replicas[0].Delete(ctx, k); err != nil {
t.Fatal(err)
}
synctest.Wait()

mu.Lock()
if len(calls) != len(replicas) {
t.Errorf("expected %d DeleteHook calls (one per replica), got %d", len(replicas), len(calls))
}
for i, c := range calls {
if string(c.lastVal) != "hello" {
t.Errorf("call[%d]: expected lastVal %q, got %q", i, "hello", c.lastVal)
}
if !c.hasDelta {
t.Errorf("call[%d]: expected non-nil Delta from tombstone merge, got nil", i)
}
}
calls = nil
mu.Unlock()

// Delete the same key again — it no longer exists. Rmv produces no
// tombstones so Delete returns early without publishing — the hook
// must not fire.
if err := replicas[0].Delete(ctx, k); err != nil {
t.Fatal(err)
}
synctest.Wait()

mu.Lock()
defer mu.Unlock()
if n := len(calls); n != 0 {
t.Errorf("DeleteHook should not be called for a non-existent key, got %d calls", n)
}
})
}

func TestCRDTBatch(t *testing.T) {
Expand Down
Loading