diff --git a/db/background_mgr.go b/db/background_mgr.go index dcfd7efc89..36f63274d0 100644 --- a/db/background_mgr.go +++ b/db/background_mgr.go @@ -543,6 +543,13 @@ func (b *BackgroundManager) setStartTime(startTime time.Time) { b.status.StartTime = startTime } +// getLastError returns the last error recorded by the manager. +func (b *BackgroundManager) getLastError() error { + b.lock.Lock() + defer b.lock.Unlock() + return b.lastError +} + // SetError sets the last known error, transitions the state to BackgroundManagerStateError and terminates the process. func (b *BackgroundManager) SetError(err error) { b.lock.Lock() diff --git a/db/background_mgr_invalidate_principals.go b/db/background_mgr_invalidate_principals.go new file mode 100644 index 0000000000..38e3159704 --- /dev/null +++ b/db/background_mgr_invalidate_principals.go @@ -0,0 +1,75 @@ +// Copyright 2012-Present Couchbase, Inc. +// +// Use of this software is governed by the Business Source License included +// in the file licenses/BSL-Couchbase.txt. As of the Change Date specified +// in that file, in accordance with the Business Source License, use of this +// software will be governed by the Apache License, Version 2.0, included in +// the file licenses/APL2.txt. + +package db + +import ( + "context" + + "github.com/couchbase/sync_gateway/base" +) + +// ===================================================================== +// InvalidatePrincipals Background Manager +// ===================================================================== + +// InvalidatePrincipalsProcess implements the principal invalidation and syncInfo update steps that follow a resync. +type InvalidatePrincipalsProcess struct { + db *DatabaseContext + regenerateSequences bool + hasAllCollections bool + docsChanged int64 + collectionIDs []uint32 +} + +var _ BackgroundManagerProcessI = &InvalidatePrincipalsProcess{} + +// newInvalidatePrincipalsManager returns a local BackgroundManager that runs the principal invalidation process +// with the given parameters. +func newInvalidatePrincipalsManager(db *DatabaseContext, regenerateSequences bool, hasAllCollections bool, docsChanged int64, collectionIDs []uint32) *BackgroundManager { + return &BackgroundManager{ + name: "invalidate_principals", + Process: &InvalidatePrincipalsProcess{ + db: db, + regenerateSequences: regenerateSequences, + hasAllCollections: hasAllCollections, + docsChanged: docsChanged, + collectionIDs: collectionIDs, + }, + terminator: base.NewSafeTerminator(), + } +} + +func (p *InvalidatePrincipalsProcess) Init(_ context.Context, _ map[string]any, _ []byte) error { + return nil +} + +func (p *InvalidatePrincipalsProcess) Run(ctx context.Context, _ map[string]any, _ updateStatusCallbackFunc, terminator *base.SafeTerminator) error { + if terminator.IsClosed() { + return nil + } + if err := invalidatePrincipals(ctx, p.db, p.regenerateSequences, p.hasAllCollections, p.docsChanged); err != nil { + return err + } + if terminator.IsClosed() { + return nil + } + if p.regenerateSequences { + updateSyncInfo(ctx, p.db, p.collectionIDs) + } + return nil +} + +func (p *InvalidatePrincipalsProcess) GetProcessStatus(status BackgroundManagerStatus, _ []byte) ([]byte, []byte, error) { + out, err := base.JSONMarshal(status) + return out, nil, err +} + +func (p *InvalidatePrincipalsProcess) SetProcessStatus(_ context.Context, _, _ []byte) {} + +func (p *InvalidatePrincipalsProcess) ResetStatus() {} diff --git a/db/background_mgr_invalidate_principals_test.go b/db/background_mgr_invalidate_principals_test.go new file mode 100644 index 0000000000..32fc2c092b --- /dev/null +++ b/db/background_mgr_invalidate_principals_test.go @@ -0,0 +1,276 @@ +// Copyright 2012-Present Couchbase, Inc. +// +// Use of this software is governed by the Business Source License included +// in the file licenses/BSL-Couchbase.txt. As of the Change Date specified +// in that file, in accordance with the Business Source License, use of this +// software will be governed by the Apache License, Version 2.0, included in +// the file licenses/APL2.txt. + +package db + +import ( + "context" + "sync/atomic" + "testing" + "time" + + "github.com/couchbase/sync_gateway/base" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// blockableProcess is a BackgroundManagerProcessI whose Run blocks until released or the terminator fires. +// After the first run, subsequent runs complete immediately to simulate a successful restart. +type blockableProcess struct { + runCount atomic.Int32 + blockCh chan struct{} // close to unblock the first run +} + +func newBlockableProcess() *blockableProcess { + return &blockableProcess{blockCh: make(chan struct{})} +} + +func (b *blockableProcess) Init(_ context.Context, _ map[string]any, _ []byte) error { return nil } + +func (b *blockableProcess) Run(_ context.Context, _ map[string]any, _ updateStatusCallbackFunc, terminator *base.SafeTerminator) error { + count := b.runCount.Add(1) + if count == 1 { + select { + case <-b.blockCh: + case <-terminator.Done(): + } + } + return nil +} + +func (b *blockableProcess) GetProcessStatus(status BackgroundManagerStatus, _ []byte) ([]byte, []byte, error) { + out, err := base.JSONMarshal(status) + return out, nil, err +} + +func (b *blockableProcess) SetProcessStatus(_ context.Context, _, _ []byte) {} +func (b *blockableProcess) ResetStatus() {} + +// newBlockableManager returns a local BackgroundManager backed by a blockableProcess. +func newBlockableManager(p *blockableProcess) *BackgroundManager { + return &BackgroundManager{ + name: "test_invalidate_principals", + Process: p, + terminator: base.NewSafeTerminator(), + } +} + +// TestInvalidatePrincipalsManagerCompletes verifies that a manager whose process completes quickly transitions +// to BackgroundProcessStateCompleted. +func TestInvalidatePrincipalsManagerCompletes(t *testing.T) { + ctx := base.TestCtx(t) + + proc := newBlockableProcess() + close(proc.blockCh) // unblock immediately so Run returns right away + mgr := newBlockableManager(proc) + + require.NoError(t, mgr.Start(ctx, nil)) + + RequireBackgroundManagerState(t, mgr, BackgroundProcessStateCompleted) +} + +// TestRunInvalidatePrincipalsLoopCompletesNormally verifies that invalidatePrincipals returns nil +// when the manager completes successfully. +func TestRunInvalidatePrincipalsLoopCompletesNormally(t *testing.T) { + ctx := base.TestCtx(t) + + proc := newBlockableProcess() + close(proc.blockCh) // unblock immediately so the first Run returns right away + resync := &ResyncManagerDCP{ + invalidatePrincipalsManager: newBlockableManager(proc), + invalidatePrincipalsPollWait: time.Millisecond, + } + + terminator := base.NewSafeTerminator() + doneCh := make(chan error, 1) + go func() { + doneCh <- resync.invalidatePrincipals(ctx, nil, false, false, terminator) + }() + + require.NoError(t, base.RequireChanRecv(t, doneCh)) +} + +// TestRunInvalidatePrincipalsLoopNoOpIfAlreadyStarted verifies that the loop does not fail when Start +// returns errBackgroundManagerProcessAlreadyRunning; it waits and then proceeds once the manager completes. +func TestRunInvalidatePrincipalsLoopNoOpIfAlreadyStarted(t *testing.T) { + ctx := base.TestCtx(t) + + proc := newBlockableProcess() + mgr := newBlockableManager(proc) + + resync := &ResyncManagerDCP{ + invalidatePrincipalsManager: mgr, + invalidatePrincipalsPollWait: time.Millisecond, + } + terminator := base.NewSafeTerminator() + + // Start the manager externally so it is already running when the loop begins. + require.NoError(t, mgr.Start(ctx, nil)) + RequireBackgroundManagerState(t, mgr, BackgroundProcessStateRunning) + + doneCh := make(chan error, 1) + go func() { + doneCh <- resync.invalidatePrincipals(ctx, nil, false, false, terminator) + }() + + // Loop should be polling (not failed) while the manager is running. + select { + case err := <-doneCh: + t.Fatalf("loop returned prematurely: %v", err) + case <-time.After(100 * time.Millisecond): + } + + // Unblock the process — manager transitions to Completed and the loop returns. + close(proc.blockCh) + + require.NoError(t, base.RequireChanRecv(t, doneCh)) +} + +// TestRunInvalidatePrincipalsLoopRestartsAfterStop verifies that when the manager is stopped before +// completing, the loop restarts it and waits for the subsequent run to complete. +func TestRunInvalidatePrincipalsLoopRestartsAfterStop(t *testing.T) { + ctx := base.TestCtx(t) + + proc := newBlockableProcess() + mgr := newBlockableManager(proc) + + resync := &ResyncManagerDCP{ + invalidatePrincipalsManager: mgr, + invalidatePrincipalsPollWait: time.Millisecond, + } + terminator := base.NewSafeTerminator() + + doneCh := make(chan error, 1) + go func() { + doneCh <- resync.invalidatePrincipals(ctx, nil, false, false, terminator) + }() + + // Wait for the manager to start. + RequireBackgroundManagerState(t, mgr, BackgroundProcessStateRunning) + + // Loop should still be polling. + select { + case err := <-doneCh: + t.Fatalf("loop returned prematurely: %v", err) + case <-time.After(100 * time.Millisecond): + } + + // Stop the manager while it is blocked — simulates a crash or external stop. + require.NoError(t, mgr.Stop(ctx)) + + // The loop must detect the Stopped state and restart the manager. The second run (count >= 2) + // completes immediately, so the loop should finish successfully. We do not assert Stopped state + // here because the fast poll interval means the loop may have already restarted the manager by + // the time we check. + require.NoError(t, base.RequireChanRecv(t, doneCh)) + assert.GreaterOrEqual(t, int(proc.runCount.Load()), 2, "process should have been run at least twice") +} + +// slowBlockableProcess is a BlockableProcess whose first Run signals readyCh when it starts and then +// waits for blockCh to be closed before completing. This lets a test interleave an invalidatePrincipals +// call between "process is known to be running" and "process completes". +type slowBlockableProcess struct { + blockableProcess + readyCh chan struct{} // closed by Run once the first goroutine is inside the select +} + +func newSlowBlockableProcess() *slowBlockableProcess { + return &slowBlockableProcess{ + blockableProcess: blockableProcess{blockCh: make(chan struct{})}, + readyCh: make(chan struct{}), + } +} + +func (s *slowBlockableProcess) Run(ctx context.Context, opts map[string]any, cb updateStatusCallbackFunc, terminator *base.SafeTerminator) error { + count := s.runCount.Add(1) + if count == 1 { + close(s.readyCh) // signal that we are inside Run and blocking + select { + case <-s.blockCh: + case <-terminator.Done(): + } + } + return nil +} + +// TestRunInvalidatePrincipalsLoopAlreadyRunningCompletesBeforeWait reproduces the bug where Start() +// returns errBackgroundManagerProcessAlreadyRunning but the manager's process finishes before +// waitInvalidatePrincipals begins polling. +// +// The old implementation blocked on the manager's internal terminator channel. When the process +// completes between the Start() call and the select statement, the terminator fires immediately and +// the subsequent RetryLoop may see GetRunState() == Running (the state is updated after Terminate() +// closes the terminator), causing it to spin for up to 500 ms. +// +// The new implementation polls GetRunState on a ticker, so the terminal state is detected on the +// next tick regardless of how GetRunState looked at the moment the terminator fired. +func TestRunInvalidatePrincipalsLoopAlreadyRunningCompletesBeforeWait(t *testing.T) { + ctx := base.TestCtx(t) + + proc := newSlowBlockableProcess() + mgr := newBlockableManager(&proc.blockableProcess) + // Swap the Run implementation to the slow version that signals readyCh. + mgr.Process = proc + + resync := &ResyncManagerDCP{ + invalidatePrincipalsManager: mgr, + invalidatePrincipalsPollWait: time.Millisecond, + } + terminator := base.NewSafeTerminator() + + // Start the manager externally; the process blocks until we close blockCh. + require.NoError(t, mgr.Start(ctx, nil)) + + // Wait until the goroutine is inside Run so we know state == Running. + select { + case <-proc.readyCh: + case <-time.After(5 * time.Second): + t.Fatal("process goroutine did not start") + } + + doneCh := make(chan error, 1) + go func() { + doneCh <- resync.invalidatePrincipals(ctx, nil, false, false, terminator) + }() + + // invalidatePrincipals calls Start(), receives errBackgroundManagerProcessAlreadyRunning, and is + // about to call waitInvalidatePrincipals. Unblock the process now so that the manager transitions + // to Completed before (or just as) waitInvalidatePrincipals begins. The loop must correctly detect + // the completed state and return nil rather than hanging. + close(proc.blockCh) + + require.NoError(t, base.RequireChanRecv(t, doneCh)) +} + +// TestRunInvalidatePrincipalsLoopExitsOnTerminator verifies that the loop exits cleanly when the resync +// terminator fires while waiting for the manager to complete. +func TestRunInvalidatePrincipalsLoopExitsOnTerminator(t *testing.T) { + ctx := base.TestCtx(t) + + proc := newBlockableProcess() + mgr := newBlockableManager(proc) + + resync := &ResyncManagerDCP{ + invalidatePrincipalsManager: mgr, + invalidatePrincipalsPollWait: time.Millisecond, + } + terminator := base.NewSafeTerminator() + + doneCh := make(chan error, 1) + go func() { + doneCh <- resync.invalidatePrincipals(ctx, nil, false, false, terminator) + }() + + // Wait for the manager to start. + RequireBackgroundManagerState(t, mgr, BackgroundProcessStateRunning) + + // Close the resync terminator — the loop should exit without error. + terminator.Close() + + require.NoError(t, base.RequireChanRecv(t, doneCh)) +} diff --git a/db/background_mgr_resync_dcp.go b/db/background_mgr_resync_dcp.go index de6d827313..4d809f50ab 100644 --- a/db/background_mgr_resync_dcp.go +++ b/db/background_mgr_resync_dcp.go @@ -18,6 +18,7 @@ import ( "strings" "sync" "sync/atomic" + "time" "github.com/couchbase/cbgt" sgbucket "github.com/couchbase/sg-bucket" @@ -50,7 +51,9 @@ type ResyncManagerDCP struct { dcpDoneChan chan error // mark when the DCP feed is completed // TODO: put this into data set by GetProcessStatus / SetProcessStatus so this can be determined for other nodes // running resync - completedvBuckets *vBucketTracker // tracks the number of completed vBuckets for the local + completedvBuckets *vBucketTracker // tracks the number of completed vBuckets for the local + invalidatePrincipalsManager *BackgroundManager + invalidatePrincipalsPollWait time.Duration // how long to wait between GetRunState polls; defaults to 500ms, overridable in tests } // vBucketTracker tracks completed vBuckets in a thread safe way. It is used to determine when all vBuckets have @@ -431,14 +434,9 @@ func (r *ResyncManagerDCP) Run(ctx context.Context, options map[string]any, pers return err } - if err := invalidatePrincipals(ctx, db, regenerateSequences, r.hasAllCollections, r.DocsChanged()); err != nil { + if err := r.invalidatePrincipals(ctx, db, regenerateSequences, r.hasAllCollections, terminator); err != nil { return err } - - // If we regenerated sequences, update syncInfo for all collections affected - if regenerateSequences { - updateSyncInfo(ctx, db, r.collectionIDs) - } // resync finished, drop back to 0 db.DbStats.Database().ResyncDocsTargeted.Set(0) case <-terminator.Done(): @@ -455,7 +453,73 @@ func (r *ResyncManagerDCP) Run(ctx context.Context, options map[string]any, pers return nil } -// invalidatePrincipals invalidates principal documents after documents have been resynced. +// invalidatePrincipals starts the invalidate principals background manager and polls until it completes. +// The cluster-aware background manager handling ensures that only a single node will be running invalidate principals - the rest wait for completion using waitInvalidatePrincipals. +// If the process is stopped before completing (e.g. due to a node failure), all remaining nodes will detect the process status going to stopped, and will attempt to restart. The loop exits +// when the manager completes successfully, returns error, or is stopped by the parent resync terminator. +func (r *ResyncManagerDCP) invalidatePrincipals(ctx context.Context, db *DatabaseContext, regenerateSequences bool, hasAllCollections bool, terminator *base.SafeTerminator) error { + if r.invalidatePrincipalsManager == nil { + r.invalidatePrincipalsManager = newInvalidatePrincipalsManager(db, regenerateSequences, hasAllCollections, r.DocsChanged(), r.collectionIDs) + } + + for { + // start a loop to invalidate principals on a single node + // All nodes race to start - one node will successfully start (with err = nil), all other nodes will return errBackgroundManagerProcessAlreadyRunning + if err := r.invalidatePrincipalsManager.Start(ctx, nil); err != nil && !errors.Is(err, errBackgroundManagerProcessAlreadyRunning) { + return err + } + // wait for background manager process to complete. If it moves to stopped without error, it indicates the node running the process has stopped. All remaining nodes will restart the loop and race to restart the principal invalidation. + // + // if Completed, return from function + done, err := r.waitInvalidatePrincipals(ctx, terminator) + if err != nil || done { + return err + } + } +} + +// waitInvalidatePrincipals polls GetRunState until the manager reaches a terminal state. Returns (true, nil) +// when the manager completes or the resync terminator fires, (false, nil) when the manager is stopped and +// should be restarted, and (false, err) on error. +// +// We poll GetRunState rather than blocking on the manager's internal terminator because Start may return +// errBackgroundManagerProcessAlreadyRunning (the manager is already running on this node). In that case we +// have no reliable way to obtain the exact terminator channel that belongs to the running goroutine without a +// data race, so it is simpler and safer to poll the public GetRunState API instead. +func (r *ResyncManagerDCP) waitInvalidatePrincipals(ctx context.Context, terminator *base.SafeTerminator) (done bool, err error) { + pollWait := r.invalidatePrincipalsPollWait + if pollWait == 0 { + pollWait = 500 * time.Millisecond + } + ticker := time.NewTicker(pollWait) + defer ticker.Stop() + for { + select { + case <-ticker.C: + state, err := r.invalidatePrincipalsManager.getClusterStatusState(ctx) + if err != nil { + base.WarnfCtx(ctx, "Could not get invalidate principals status: %v, continuing to poll", err) + } + switch state { + case BackgroundProcessStateCompleted: + return true, nil + case BackgroundProcessStateError: + return false, r.invalidatePrincipalsManager.getLastError() + case BackgroundProcessStateStopped: + return false, nil + case BackgroundProcessStateRunning, BackgroundProcessStateStopping: + // still in progress, keep polling + } + case <-terminator.Done(): + // if ResyncManager.Stop was called, stop the principal invalidation + if err := r.invalidatePrincipalsManager.Stop(ctx); err != nil { + base.WarnfCtx(ctx, "Failed to stop invalidate principals manager: %v", err) + } + return true, nil + } + } +} + func invalidatePrincipals(ctx context.Context, db *DatabaseContext, regenerateSequences bool, resyncAllCollections bool, docsChanged int64) error { // If the principal docs sequences are regenerated, or the user doc need to be invalidated after a dynamic channel grant, db.QueryPrincipals is called to find the principal docs. // In the case that a database is created with "start_offline": true, it is possible the index needed to create this is not yet ready, so make sure it is ready for use.