Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions src/cluster/cluster.cc
Original file line number Diff line number Diff line change
Expand Up @@ -986,6 +986,14 @@ Status Cluster::Reset() {
migrated_slots_.clear();
imported_slots_.clear();

// The migrator's forbidden slot range persists past a successful migration
// and is only harmless while slots_nodes_[slot] no longer points at us.
// A subsequent SETNODES after reset can reassign that slot back here and
// spuriously reject writes with TRYAGAIN, so drop it explicitly.
if (srv_->slot_migrator) {
srv_->slot_migrator->ReleaseForbiddenSlotRange();
}

// unlink the cluster nodes file if exists
unlink(srv_->GetConfig()->NodesFilePath().data());
return Status::OK();
Expand Down
66 changes: 66 additions & 0 deletions tests/gocase/integration/slotmigrate/slotmigrate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1422,3 +1422,69 @@ func TestSlotRangeMigrate(t *testing.T) {
})

}

// TestClusterResetPreservesForbiddenSlot reproduces a bug where the source
// node keeps its `forbidden_slot_range_` set after a slot has been migrated
// away. Under normal operation this is harmless because the topology has
// already switched the slot to the destination node, so the forbidden-slot
// check in CanExecByMySelf is never reached for that slot. However, after
// `CLUSTER RESET` clears the cluster topology, if the new topology assigns
// the previously-migrated slot back to this node, writes to that slot are
// spuriously rejected with `TRYAGAIN Can't write to slot being migrated
// which is in write forbidden phase`.
func TestClusterResetPreservesForbiddenSlot(t *testing.T) {
ctx := context.Background()

srv0 := util.StartServer(t, map[string]string{"cluster-enabled": "yes"})
defer func() { srv0.Close() }()
rdb0 := srv0.NewClient()
defer func() { require.NoError(t, rdb0.Close()) }()
id0 := "resetforbiddenxxxxxxxxxxxxxxxxxxxxxxxx00"
require.NoError(t, rdb0.Do(ctx, "clusterx", "SETNODEID", id0).Err())

srv1 := util.StartServer(t, map[string]string{"cluster-enabled": "yes"})
defer func() { srv1.Close() }()
rdb1 := srv1.NewClient()
defer func() { require.NoError(t, rdb1.Close()) }()
id1 := "resetforbiddenxxxxxxxxxxxxxxxxxxxxxxxx01"
require.NoError(t, rdb1.Do(ctx, "clusterx", "SETNODEID", id1).Err())

setupCluster := func(version int) {
clusterNodes := fmt.Sprintf("%s %s %d master - 0-8191\n", id0, srv0.Host(), srv0.Port())
clusterNodes += fmt.Sprintf("%s %s %d master - 8192-16383", id1, srv1.Host(), srv1.Port())
require.NoError(t, rdb0.Do(ctx, "clusterx", "SETNODES", clusterNodes, strconv.Itoa(version)).Err())
require.NoError(t, rdb1.Do(ctx, "clusterx", "SETNODES", clusterNodes, strconv.Itoa(version)).Err())
}

setupCluster(1)

// Migrate slot 3300 (the slot key "b" hashes to) from srv0 to srv1.
slot := 3300
require.Equal(t, "OK", rdb0.Do(ctx, "clusterx", "migrate", slot, id1).Val())
waitForMigrateState(t, rdb0, slot, SlotMigrationStateSuccess)

// Update topology so slot 3300 now belongs to srv1.
srcVer, err := rdb0.Do(ctx, "clusterx", "version").Int()
require.NoError(t, err)
require.NoError(t, rdb0.Do(ctx, "clusterx", "setslot", slot, "node", id1, srcVer+1).Err())
dstVer, err := rdb1.Do(ctx, "clusterx", "version").Int()
require.NoError(t, err)
require.NoError(t, rdb1.Do(ctx, "clusterx", "setslot", slot, "node", id1, dstVer+1).Err())

// Empty the DB so `CLUSTER RESET` is allowed (it refuses on non-empty DBs).
require.NoError(t, rdb0.FlushDB(ctx).Err())

// CLUSTER RESET HARD — this clears cluster topology, node id, migrated_slots_,
// imported_slots_, etc., but does NOT reset the migrator's forbidden_slot_range_.
require.Equal(t, "OK", rdb0.Do(ctx, "cluster", "reset", "hard").Val())

// Recreate the cluster; srv0 now owns slot 3300 again.
require.NoError(t, rdb0.Do(ctx, "clusterx", "SETNODEID", id0).Err())
setupCluster(2)

// A write to a key hashing to slot 3300 must succeed. Before the fix this
// returns `TRYAGAIN Can't write to slot being migrated which is in write
// forbidden phase` because forbidden_slot_range_ still holds {3300, 3300}.
key := util.SlotTable[slot]
require.NoError(t, rdb0.Set(ctx, key, "1", 0).Err())
}
Loading