From 5c0467b6689389cbf1203f3680f4ca2a437b5b60 Mon Sep 17 00:00:00 2001 From: Ying Huang Date: Fri, 16 Sep 2022 17:57:15 +0000 Subject: [PATCH 1/7] Split vNode based on requested host number --- .../pkg/distributor/distributor.go | 13 ++++-- .../pkg/distributor/storage/nodestore.go | 46 ++++++++++++++++++- .../pkg/distributor/storage/nodestore_test.go | 17 +++++++ 3 files changed, 70 insertions(+), 6 deletions(-) create mode 100644 resource-management/pkg/distributor/storage/nodestore_test.go diff --git a/resource-management/pkg/distributor/distributor.go b/resource-management/pkg/distributor/distributor.go index 5974432..ab41c71 100644 --- a/resource-management/pkg/distributor/distributor.go +++ b/resource-management/pkg/distributor/distributor.go @@ -131,10 +131,15 @@ func (dis *ResourceDistributor) allocateNodesToClient(clientId string, requested assignedHostCount := 0 hostAssignIsOK := false for i := 0; i < len(storesToSelectInorder); i++ { - selectedStores = append(selectedStores, storesToSelectInorder[i]) - assignedHostCount += (*storesToSelectInorder[i]).GetHostNum() - if assignedHostCount >= requestedHostNum { - hostAssignIsOK = true + newHostCount := assignedHostCount + (*storesToSelectInorder[i]).GetHostNum() + if newHostCount <= requestedHostNum { + selectedStores = append(selectedStores, storesToSelectInorder[i]) + assignedHostCount = newHostCount + if newHostCount == requestedHostNum { + hostAssignIsOK = true + break + } + } else { // split vNode break } } diff --git a/resource-management/pkg/distributor/storage/nodestore.go b/resource-management/pkg/distributor/storage/nodestore.go index b15d4c6..8381ce3 100644 --- a/resource-management/pkg/distributor/storage/nodestore.go +++ b/resource-management/pkg/distributor/storage/nodestore.go @@ -19,6 +19,7 @@ package storage import ( "k8s.io/klog/v2" "math" + "sort" "sync" "global-resource-service/resource-management/pkg/common-lib/hash" @@ -38,8 +39,8 @@ const ( type VirtualNodeStore struct { mu sync.RWMutex nodeEventByHash map[float64]*node.ManagedNodeEvent - lowerbound float64 - upperbound float64 + lowerbound float64 // inclusive + upperbound float64 // exclusive // one virtual store can only have nodes from one resource partition location location.Location @@ -120,6 +121,47 @@ func (vs *VirtualNodeStore) GenerateBookmarkEvent() *node.ManagedNodeEvent { return nil } +// Input requested host count +// return pointer to new virtual node store - splitted from current virtual node store +func (vs *VirtualNodeStore) Split(requestedHostCount int) *VirtualNodeStore { + vs.mu.Lock() + defer vs.mu.Unlock() + currentSize := len(vs.nodeEventByHash) + if currentSize <= requestedHostCount { + klog.Errorf("Requested to split virtual node when capacity (%d) is less than or equal to requested host count (%d)", currentSize, requestedHostCount) + return nil + } + + nodeHashes := make([]float64, currentSize) + index := 0 + for k, _ := range vs.nodeEventByHash { + nodeHashes[index] = k + index++ + } + sort.Float64s(nodeHashes) + newUBoundForCurrentVS := nodeHashes[requestedHostCount] + vs.upperbound = newUBoundForCurrentVS + + // create new virtual node store + newVSStore := &VirtualNodeStore{ + mu: sync.RWMutex{}, + nodeEventByHash: make(map[float64]*node.ManagedNodeEvent, currentSize-requestedHostCount), + lowerbound: newUBoundForCurrentVS, + upperbound: vs.upperbound, + location: vs.location, + } + + // move nodes from existing vs to new vs + for i := requestedHostCount; i < currentSize; i++ { + nodeHashKey := nodeHashes[i] + newVSStore.nodeEventByHash[nodeHashKey] = vs.nodeEventByHash[nodeHashKey] + delete(vs.nodeEventByHash, nodeHashKey) + } + + // TODO Add new virtual node into store + return newVSStore +} + type NodeStore struct { // granularity of the ring - degree for each virtual node managed arc granularOfRing float64 diff --git a/resource-management/pkg/distributor/storage/nodestore_test.go b/resource-management/pkg/distributor/storage/nodestore_test.go new file mode 100644 index 0000000..5f83a74 --- /dev/null +++ b/resource-management/pkg/distributor/storage/nodestore_test.go @@ -0,0 +1,17 @@ +/* +Copyright 2022 Authors of Global Resource Service. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package storage From d5df3e6363c6ccd12d32bd2f9868954a26c2e1bb Mon Sep 17 00:00:00 2001 From: Ying Huang Date: Tue, 20 Sep 2022 15:16:04 +0000 Subject: [PATCH 2/7] Add new virtual node as child node --- .../pkg/distributor/storage/nodestore.go | 85 ++++++++++++------- 1 file changed, 52 insertions(+), 33 deletions(-) diff --git a/resource-management/pkg/distributor/storage/nodestore.go b/resource-management/pkg/distributor/storage/nodestore.go index 8381ce3..47b5280 100644 --- a/resource-management/pkg/distributor/storage/nodestore.go +++ b/resource-management/pkg/distributor/storage/nodestore.go @@ -36,16 +36,34 @@ const ( BatchPersistSize = 100 ) +// Maximal two levels: +// 1. Original VS +// 2. Splitted VirtualNodeStores type VirtualNodeStore struct { - mu sync.RWMutex + mu sync.RWMutex + + // node events belong to current VS nodeEventByHash map[float64]*node.ManagedNodeEvent - lowerbound float64 // inclusive - upperbound float64 // exclusive + // lower bound for current VS - inclusive - immutable for top level VS + lowerbound float64 + // upper bound for current VS - exclusive - immutable for top level VS + upperbound float64 + // effective upper bound for top level VS - same for child VS + adjustedUpperBound float64 + + // empty for child VS + splittVirtualNodeStores []*VirtualNodeStore + + // nil for top level VS + parentVirtualNodeStore *VirtualNodeStore // one virtual store can only have nodes from one resource partition location location.Location - clientId string + // client current VS is assigned to + clientId string + + // event queue that send node events to eventQueue *cache.NodeEventQueue } @@ -122,44 +140,45 @@ func (vs *VirtualNodeStore) GenerateBookmarkEvent() *node.ManagedNodeEvent { } // Input requested host count -// return pointer to new virtual node store - splitted from current virtual node store -func (vs *VirtualNodeStore) Split(requestedHostCount int) *VirtualNodeStore { +func (vs *VirtualNodeStore) AdjustCapacity(requestedHostCount int) { vs.mu.Lock() defer vs.mu.Unlock() currentSize := len(vs.nodeEventByHash) if currentSize <= requestedHostCount { klog.Errorf("Requested to split virtual node when capacity (%d) is less than or equal to requested host count (%d)", currentSize, requestedHostCount) - return nil + return } - nodeHashes := make([]float64, currentSize) - index := 0 - for k, _ := range vs.nodeEventByHash { - nodeHashes[index] = k - index++ - } - sort.Float64s(nodeHashes) - newUBoundForCurrentVS := nodeHashes[requestedHostCount] - vs.upperbound = newUBoundForCurrentVS - - // create new virtual node store - newVSStore := &VirtualNodeStore{ - mu: sync.RWMutex{}, - nodeEventByHash: make(map[float64]*node.ManagedNodeEvent, currentSize-requestedHostCount), - lowerbound: newUBoundForCurrentVS, - upperbound: vs.upperbound, - location: vs.location, - } + // first splitted vs + if vs.parentVirtualNodeStore == nil && len(vs.splittVirtualNodeStores) == 0 { + nodeHashes := make([]float64, currentSize) + index := 0 + for k, _ := range vs.nodeEventByHash { + nodeHashes[index] = k + index++ + } + sort.Float64s(nodeHashes) + newUBoundForCurrentVS := nodeHashes[requestedHostCount] + vs.adjustedUpperBound = newUBoundForCurrentVS + + // create new virtual node store + newVSStore := &VirtualNodeStore{ + mu: sync.RWMutex{}, + nodeEventByHash: make(map[float64]*node.ManagedNodeEvent, currentSize-requestedHostCount), + lowerbound: newUBoundForCurrentVS, + upperbound: vs.upperbound, + location: vs.location, + } - // move nodes from existing vs to new vs - for i := requestedHostCount; i < currentSize; i++ { - nodeHashKey := nodeHashes[i] - newVSStore.nodeEventByHash[nodeHashKey] = vs.nodeEventByHash[nodeHashKey] - delete(vs.nodeEventByHash, nodeHashKey) - } + // move nodes from existing vs to new vs + for i := requestedHostCount; i < currentSize; i++ { + nodeHashKey := nodeHashes[i] + newVSStore.nodeEventByHash[nodeHashKey] = vs.nodeEventByHash[nodeHashKey] + delete(vs.nodeEventByHash, nodeHashKey) + } - // TODO Add new virtual node into store - return newVSStore + // Add new virtual node into store + } } type NodeStore struct { From 962f487c48a743e8b853c535377b9a40b4a79f48 Mon Sep 17 00:00:00 2001 From: Ying Huang Date: Wed, 12 Oct 2022 00:03:04 +0000 Subject: [PATCH 3/7] Generalize moveNodes func + case 1: first splitted vs --- .../pkg/distributor/storage/nodestore.go | 201 +++++++++++++++--- .../pkg/distributor/storage/nodestore_test.go | 100 +++++++++ 2 files changed, 275 insertions(+), 26 deletions(-) diff --git a/resource-management/pkg/distributor/storage/nodestore.go b/resource-management/pkg/distributor/storage/nodestore.go index 47b5280..f9ab18b 100644 --- a/resource-management/pkg/distributor/storage/nodestore.go +++ b/resource-management/pkg/distributor/storage/nodestore.go @@ -44,11 +44,14 @@ type VirtualNodeStore struct { // node events belong to current VS nodeEventByHash map[float64]*node.ManagedNodeEvent - // lower bound for current VS - inclusive - immutable for top level VS + // lower bound - inclusive - immutable for top level VS, not used for child vs lowerbound float64 - // upper bound for current VS - exclusive - immutable for top level VS + // upper bound - exclusive - immutable for top level VS, not used for child vs upperbound float64 - // effective upper bound for top level VS - same for child VS + + // effective lower bound + adjustedLowerBound float64 + // effective upper bound adjustedUpperBound float64 // empty for child VS @@ -140,45 +143,191 @@ func (vs *VirtualNodeStore) GenerateBookmarkEvent() *node.ManagedNodeEvent { } // Input requested host count -func (vs *VirtualNodeStore) AdjustCapacity(requestedHostCount int) { +// TODO: error cases +func (vs *VirtualNodeStore) AdjustCapacity(vNStoAdjust *VirtualNodeStore, requestedHostCount int) { vs.mu.Lock() defer vs.mu.Unlock() - currentSize := len(vs.nodeEventByHash) + + currentSize := len(vNStoAdjust.nodeEventByHash) if currentSize <= requestedHostCount { klog.Errorf("Requested to split virtual node when capacity (%d) is less than or equal to requested host count (%d)", currentSize, requestedHostCount) return } + if vs.parentVirtualNodeStore != nil { + klog.Error("VS capacity adjustment can only be done from parent node") + return + } - // first splitted vs - if vs.parentVirtualNodeStore == nil && len(vs.splittVirtualNodeStores) == 0 { - nodeHashes := make([]float64, currentSize) - index := 0 - for k, _ := range vs.nodeEventByHash { - nodeHashes[index] = k - index++ + if len(vs.splittVirtualNodeStores) == 0 { // first splitted vs + if vs != vNStoAdjust { + klog.Error("Invalid virtual node status: first time split with unmatched virtual node stores") + return } - sort.Float64s(nodeHashes) - newUBoundForCurrentVS := nodeHashes[requestedHostCount] - vs.adjustedUpperBound = newUBoundForCurrentVS // create new virtual node store newVSStore := &VirtualNodeStore{ - mu: sync.RWMutex{}, - nodeEventByHash: make(map[float64]*node.ManagedNodeEvent, currentSize-requestedHostCount), - lowerbound: newUBoundForCurrentVS, - upperbound: vs.upperbound, - location: vs.location, + mu: sync.RWMutex{}, + nodeEventByHash: make(map[float64]*node.ManagedNodeEvent, currentSize-requestedHostCount), + location: vs.location, + parentVirtualNodeStore: vs, + adjustedLowerBound: vs.lowerbound, + adjustedUpperBound: vs.upperbound, } + vs.adjustedLowerBound = vs.lowerbound + vs.adjustedUpperBound = vs.upperbound + vs.moveNodes(vs, newVSStore, currentSize-requestedHostCount, true) - // move nodes from existing vs to new vs - for i := requestedHostCount; i < currentSize; i++ { - nodeHashKey := nodeHashes[i] - newVSStore.nodeEventByHash[nodeHashKey] = vs.nodeEventByHash[nodeHashKey] - delete(vs.nodeEventByHash, nodeHashKey) + // Add new virtual node into store + vs.splittVirtualNodeStores = make([]*VirtualNodeStore, 2) + vs.splittVirtualNodeStores[0] = vs + vs.splittVirtualNodeStores[1] = newVSStore + } else { + currentIndex := vs.findIndexOfVS(vNStoAdjust) + if currentIndex < 0 { + klog.Error("Invalid VirtualNodeStore config") + return } - // Add new virtual node into store + newVSStore, countFromHighEnd := vs.findAdjacentVacantStore(currentIndex) + vs.moveNodes(vNStoAdjust, newVSStore, currentSize-requestedHostCount, countFromHighEnd) + // sort splittVirtualNodeStores by range + sort.Sort(vs) + } +} + +func (vs *VirtualNodeStore) findIndexOfVS(vsToLocate *VirtualNodeStore) int { + for i := 0; i < len(vs.splittVirtualNodeStores); i++ { + if vs.splittVirtualNodeStores[i] == vsToLocate { + return i + } + } + return -1 +} + +// Return: +// 1. virtualNodeStore that is free and can move nodes to, if there is no, create a new one +// 2. False if adjancent VS is to the left of store[index], true if adjancent VS is to the right of the store[index] +func (vs *VirtualNodeStore) findAdjacentVacantStore(index int) (*VirtualNodeStore, bool) { + if index == 0 { // leftmost + if vs.splittVirtualNodeStores[1].clientId == "" { // assigned + return vs.splittVirtualNodeStores[1], true + } + } else if index == len(vs.splittVirtualNodeStores)-1 { // rightmost + if vs.splittVirtualNodeStores[index-1].clientId == "" { + return vs.splittVirtualNodeStores[index-1], false + } + } else { + if vs.splittVirtualNodeStores[index-1].clientId == "" { + return vs.splittVirtualNodeStores[index-1], false + } else if vs.splittVirtualNodeStores[index+1].clientId == "" { + return vs.splittVirtualNodeStores[index+1], true + } + } + + // create new store + newVSStore := &VirtualNodeStore{ + mu: sync.RWMutex{}, + nodeEventByHash: make(map[float64]*node.ManagedNodeEvent), + location: vs.location, + parentVirtualNodeStore: vs, + lowerbound: -1, + upperbound: -1, + } + vs.splittVirtualNodeStores = append(vs.splittVirtualNodeStores, newVSStore) + return newVSStore, true +} + +// Move #hostCount from sourceVNS to dstVNS +// If countFromHighEnd == true, choose hash value from biggest (move to right store) +// If countFromHighEnd == false, choose hash value from smallest (move to left store) +// Internal function, assume input is valid, no need to validate +func (vs *VirtualNodeStore) moveNodes(sourceVNS, dstVNS *VirtualNodeStore, hostCountToMove int, countFromHighEnd bool) error { + // optimization + if len(dstVNS.nodeEventByHash) == 0 && len(sourceVNS.nodeEventByHash) < hostCountToMove*2 { + switchVirtualNodeStore(sourceVNS, dstVNS) + sourceVNS, dstVNS = dstVNS, sourceVNS + hostCountToMove = len(sourceVNS.nodeEventByHash) - hostCountToMove + countFromHighEnd = !countFromHighEnd } + + // sort nodes in sourceVNS by hash value + nodeHashes := make([]float64, len(sourceVNS.nodeEventByHash)) + index := 0 + for k, _ := range sourceVNS.nodeEventByHash { + nodeHashes[index] = k + index++ + } + sort.Float64s(nodeHashes) + + // starting position and bound adjustment + start := 0 + if countFromHighEnd { + start = index - 1 + sourceVNS.adjustedUpperBound = nodeHashes[len(sourceVNS.nodeEventByHash)-hostCountToMove] + dstVNS.adjustedLowerBound = sourceVNS.adjustedUpperBound + } else { + sourceVNS.adjustedLowerBound = nodeHashes[hostCountToMove] + dstVNS.adjustedUpperBound = sourceVNS.adjustedLowerBound + } + count := 0 + + // move nodes + for { + nodeHashKey := nodeHashes[start] + dstVNS.nodeEventByHash[nodeHashKey] = sourceVNS.nodeEventByHash[nodeHashKey] + delete(sourceVNS.nodeEventByHash, nodeHashKey) + count++ + + if count < hostCountToMove { + if countFromHighEnd { + start-- + } else { + start++ + } + } else { + break + } + } + + return nil +} + +// This function switch all elements of store1 and store2 +// No need to switch location - it should always be the same (otherwise, nodes cannot be switched) +func switchVirtualNodeStore(store1, store2 *VirtualNodeStore) { + // lock cannot be switched + //store1.mu, store2.mu = store2.mu, store1.mu + store1.nodeEventByHash, store2.nodeEventByHash = store2.nodeEventByHash, store1.nodeEventByHash + + // lower and upper bounds are immutable + //store1.lowerbound, store2.lowerbound = store2.lowerbound, store1.lowerbound + //store1.upperbound, store2.upperbound = store2.upperbound, store1.upperbound + store1.adjustedLowerBound, store2.adjustedLowerBound = store2.adjustedLowerBound, store1.adjustedLowerBound + store1.adjustedUpperBound, store2.adjustedUpperBound = store2.adjustedUpperBound, store1.adjustedUpperBound + + store1.splittVirtualNodeStores, store2.splittVirtualNodeStores = store2.splittVirtualNodeStores, store1.splittVirtualNodeStores + // parent stores CANNOT be switched + //store1.parentVirtualNodeStore, store2.parentVirtualNodeStore = store2.parentVirtualNodeStore, store1.parentVirtualNodeStore + + if store1.clientId != store2.clientId { + store1.clientId, store2.clientId = store2.clientId, store1.clientId + } + if store1.eventQueue != store2.eventQueue { + store1.eventQueue, store2.eventQueue = store2.eventQueue, store1.eventQueue + } +} + +// Implement sort.Interface +func (vs *VirtualNodeStore) Len() int { + return len(vs.splittVirtualNodeStores) +} + +func (vs *VirtualNodeStore) Less(i, j int) bool { + return vs.splittVirtualNodeStores[i].adjustedLowerBound < vs.splittVirtualNodeStores[j].adjustedLowerBound +} + +func (vs *VirtualNodeStore) Swap(i, j int) { + vs.splittVirtualNodeStores[i], vs.splittVirtualNodeStores[j] = vs.splittVirtualNodeStores[j], vs.splittVirtualNodeStores[i] } type NodeStore struct { diff --git a/resource-management/pkg/distributor/storage/nodestore_test.go b/resource-management/pkg/distributor/storage/nodestore_test.go index 5f83a74..647b943 100644 --- a/resource-management/pkg/distributor/storage/nodestore_test.go +++ b/resource-management/pkg/distributor/storage/nodestore_test.go @@ -15,3 +15,103 @@ limitations under the License. */ package storage + +import ( + "github.com/google/uuid" + "github.com/stretchr/testify/assert" + "global-resource-service/resource-management/pkg/common-lib/hash" + "global-resource-service/resource-management/pkg/common-lib/types" + "global-resource-service/resource-management/pkg/common-lib/types/location" + "global-resource-service/resource-management/pkg/common-lib/types/runtime" + "global-resource-service/resource-management/pkg/distributor/node" + "math" + "strconv" + "sync" + "testing" +) + +func TestAdjustCapacity_1stSplit(t *testing.T) { + for i := 25; i < 80; i++ { + for j := 1; j < i; j++ { + vs := generateInitVirtalNodes(i) + lowerBoundOld := vs.lowerbound + upperBoundOld := vs.upperbound + vsPointerOld := vs + + vs.AdjustCapacity(vs, j) + + // check result + assert.Nil(t, vs.parentVirtualNodeStore) + assert.Equal(t, j, len(vs.nodeEventByHash)) + + assert.Equal(t, 2, len(vs.splittVirtualNodeStores)) + assert.NotEqual(t, vs.splittVirtualNodeStores[0], vs.splittVirtualNodeStores[1]) + assert.Equal(t, i, len(vs.splittVirtualNodeStores[0].nodeEventByHash)+len(vs.splittVirtualNodeStores[1].nodeEventByHash)) + + assert.True(t, vs.splittVirtualNodeStores[0].adjustedLowerBound < vs.splittVirtualNodeStores[1].adjustedLowerBound) + assert.True(t, vs.splittVirtualNodeStores[0].adjustedUpperBound < vs.splittVirtualNodeStores[1].adjustedUpperBound) + assert.Equal(t, vs.splittVirtualNodeStores[0].adjustedUpperBound, vs.splittVirtualNodeStores[1].adjustedLowerBound) + + // init vs has immutable lower & upper bound + assert.Equal(t, lowerBoundOld, vs.lowerbound) + assert.Equal(t, upperBoundOld, vs.upperbound) + assert.Equal(t, vs.lowerbound, vs.splittVirtualNodeStores[0].adjustedLowerBound) + assert.Equal(t, vs.upperbound, vs.splittVirtualNodeStores[1].adjustedUpperBound) + vs.mu.Lock() + vs.mu.Unlock() + + // has one and only one store as parent + assert.Equal(t, vsPointerOld, vs) + assert.Nil(t, vs.splittVirtualNodeStores[0].parentVirtualNodeStore) + assert.Equal(t, vs, vs.splittVirtualNodeStores[1].parentVirtualNodeStore) + } + } +} + +func TestAdjustCapacity_Merge(t *testing.T) { + // one +} + +func generateInitVirtalNodes(hostNum int) *VirtualNodeStore { + loc := location.NewLocation(location.Beijing, location.ResourcePartition2) + vs := &VirtualNodeStore{ + mu: sync.RWMutex{}, + nodeEventByHash: make(map[float64]*node.ManagedNodeEvent, hostNum), + location: *loc, + } + + lowerBound := float64(math.MaxUint64) + upperBound := float64(-1) + for i := 0; i < hostNum; i++ { + n := createRandomNode(i, loc) + ne := runtime.NewNodeEvent(n, runtime.Added) + mgmtNE := node.NewManagedNodeEvent(ne, loc) + hashValue := float64(hash.HashStrToUInt64(mgmtNE.GetId())) / math.MaxUint64 * 360 + vs.nodeEventByHash[hashValue] = mgmtNE + + if lowerBound > hashValue { + lowerBound = hashValue + } + if upperBound < hashValue { + upperBound = hashValue + } + } + + vs.lowerbound = lowerBound + // upperBound is exclusive, increase a bit + vs.upperbound = upperBound + 1 + + return vs +} + +func createRandomNode(rv int, loc *location.Location) *types.LogicalNode { + id := uuid.New() + return &types.LogicalNode{ + Id: id.String(), + ResourceVersion: strconv.Itoa(rv), + GeoInfo: types.NodeGeoInfo{ + Region: types.RegionName(loc.GetRegion()), + ResourcePartition: types.ResourcePartitionName(loc.GetResourcePartition()), + }, + } +} From b90dcd9cffe8deaec26fc6a3807ab941dfeda632 Mon Sep 17 00:00:00 2001 From: Ying Huang Date: Thu, 13 Oct 2022 18:13:49 +0000 Subject: [PATCH 4/7] Support event watch for vNode split 1st case --- .../pkg/distributor/distributor.go | 10 ++++-- .../distributor_concurrency_test.go | 4 +-- .../pkg/distributor/distributor_test.go | 8 ++--- .../pkg/distributor/storage/nodestore.go | 35 ++++++++++++++++++- .../pkg/distributor/storage/nodestore_test.go | 2 ++ 5 files changed, 50 insertions(+), 9 deletions(-) diff --git a/resource-management/pkg/distributor/distributor.go b/resource-management/pkg/distributor/distributor.go index ab41c71..57d5d93 100644 --- a/resource-management/pkg/distributor/distributor.go +++ b/resource-management/pkg/distributor/distributor.go @@ -131,15 +131,21 @@ func (dis *ResourceDistributor) allocateNodesToClient(clientId string, requested assignedHostCount := 0 hostAssignIsOK := false for i := 0; i < len(storesToSelectInorder); i++ { - newHostCount := assignedHostCount + (*storesToSelectInorder[i]).GetHostNum() + candidateStore := storesToSelectInorder[i] + + newHostCount := assignedHostCount + candidateStore.GetHostNum() if newHostCount <= requestedHostNum { - selectedStores = append(selectedStores, storesToSelectInorder[i]) + selectedStores = append(selectedStores, candidateStore) assignedHostCount = newHostCount if newHostCount == requestedHostNum { hostAssignIsOK = true break } } else { // split vNode + candidateStore.AdjustCapacity(candidateStore, requestedHostNum-assignedHostCount) + selectedStores = append(selectedStores, candidateStore) + assignedHostCount = requestedHostNum + hostAssignIsOK = true break } } diff --git a/resource-management/pkg/distributor/distributor_concurrency_test.go b/resource-management/pkg/distributor/distributor_concurrency_test.go index 82ffdd5..9b2b6a6 100644 --- a/resource-management/pkg/distributor/distributor_concurrency_test.go +++ b/resource-management/pkg/distributor/distributor_concurrency_test.go @@ -112,7 +112,7 @@ func TestSingleRPMutipleClients_Workflow(t *testing.T) { assert.Nil(t, err) assert.NotNil(t, latestRVs) - assert.True(t, len(nodes) >= tt.hostPerClient) + assert.Equal(t, tt.hostPerClient, len(nodes)) // t.Logf("Client %d %s latest rvs: %v.Total hosts: %d\n", i, clientId, latestRVs, len(nodes)) latestRVsByClient[i] = latestRVs nodesByClient[i] = nodes @@ -297,7 +297,7 @@ func TestMultipleRPsMutipleClients_Workflow(t *testing.T) { assert.Nil(t, err) assert.NotNil(t, latestRVs) - assert.True(t, len(nodes) >= tt.hostPerClient) + assert.Equal(t, tt.hostPerClient, len(nodes)) // t.Logf("Client %d %s latest rvs: %v.Total hosts: %d\n", i, clientId, latestRVs, len(nodes)) latestRVsByClient[i] = latestRVs nodesByClient[i] = nodes diff --git a/resource-management/pkg/distributor/distributor_test.go b/resource-management/pkg/distributor/distributor_test.go index 1d44c7e..bf3b6f6 100644 --- a/resource-management/pkg/distributor/distributor_test.go +++ b/resource-management/pkg/distributor/distributor_test.go @@ -370,7 +370,7 @@ func TestRegisterClient_WithinLimit(t *testing.T) { hostCount += vs.GetHostNum() } t.Logf("Total %d hosts are assigned to client %s\nTook %v to register the client.\n", hostCount, clientId, duration) - assert.True(t, hostCount >= requestedHostNum, "Assigned host number %d is less than requested %d", hostCount, requestedHostNum) + assert.Equal(t, requestedHostNum, hostCount, "Assigned host number %d is less than requested %d", hostCount, requestedHostNum) // check nodes number with list nodes nodes, _, err := distributor.ListNodesForClient(clientId) @@ -411,7 +411,7 @@ func TestRegistrationWorkflow(t *testing.T) { nodes, latestRVs, err := distributor.ListNodesForClient(clientId) assert.Nil(t, err) assert.NotNil(t, latestRVs) - assert.True(t, len(nodes) >= 500) + assert.Equal(t, requestedHostNum, len(nodes)) t.Logf("Latest rvs: %v. Total hosts: %d\n", latestRVs, len(nodes)) // check each node event nodeIds := make(map[string]bool) @@ -536,7 +536,7 @@ func TestRegistration5MCase(t *testing.T) { assert.Nil(t, err2) assert.NotNil(t, latestRVs) - assert.True(t, len(nodes) >= tt.hostPerScheduler) + assert.Equal(t, tt.hostPerScheduler, len(nodes)) klog.Infof("List nodes for client %s took %v", clientId, duration) } @@ -576,7 +576,7 @@ func TestWatchRenewal(t *testing.T) { nodes, latestRVs, err := distributor.ListNodesForClient(clientId) assert.Nil(t, err) assert.NotNil(t, latestRVs) - assert.True(t, len(nodes) >= 500) + assert.Equal(t, requestedHostNum, len(nodes)) t.Logf("Latest rvs: %v. Total hosts: %d\n", latestRVs, len(nodes)) // check each node event nodeIds := make(map[string]bool) diff --git a/resource-management/pkg/distributor/storage/nodestore.go b/resource-management/pkg/distributor/storage/nodestore.go index f9ab18b..7bf1da9 100644 --- a/resource-management/pkg/distributor/storage/nodestore.go +++ b/resource-management/pkg/distributor/storage/nodestore.go @@ -17,6 +17,8 @@ limitations under the License. package storage import ( + "bytes" + "fmt" "k8s.io/klog/v2" "math" "sort" @@ -330,6 +332,21 @@ func (vs *VirtualNodeStore) Swap(i, j int) { vs.splittVirtualNodeStores[i], vs.splittVirtualNodeStores[j] = vs.splittVirtualNodeStores[j], vs.splittVirtualNodeStores[i] } +func (vs *VirtualNodeStore) GetBoundaris() string { + vs.mu.RLock() + defer vs.mu.RUnlock() + + result := new(bytes.Buffer) + result.WriteString(fmt.Sprintf("original boundary (%v, %v), adjusted boundary (%v, %v)\n", + vs.lowerbound, vs.upperbound, vs.adjustedLowerBound, vs.adjustedUpperBound)) + for i := 0; i < len(vs.splittVirtualNodeStores); i++ { + result.WriteString(fmt.Sprintf("Child store %d, boundary (%v, %v)\n", + i, vs.splittVirtualNodeStores[i].adjustedLowerBound, vs.splittVirtualNodeStores[i].adjustedUpperBound)) + } + + return string(result.Bytes()) +} + type NodeStore struct { // granularity of the ring - degree for each virtual node managed arc granularOfRing float64 @@ -571,7 +588,23 @@ func (ns *NodeStore) getNodeHash(node *node.ManagedNodeEvent) (float64, int) { func (ns *NodeStore) getVirtualNodeStore(node *node.ManagedNodeEvent) (float64, int, *VirtualNodeStore) { hashValue, ringId := ns.getNodeHash(node) virtualNodeIndex := int(math.Floor(hashValue / ns.granularOfRing)) - return hashValue, ringId, (*ns.vNodeStores)[virtualNodeIndex] + parentVNS := (*ns.vNodeStores)[virtualNodeIndex] + if len(parentVNS.splittVirtualNodeStores) > 0 && (hashValue < parentVNS.adjustedLowerBound || hashValue >= parentVNS.adjustedUpperBound) { + // prevent node splitting happening during search + parentVNS.mu.RLock() + defer parentVNS.mu.RUnlock() + // node shall locate in child VirtualNodeStore + // linear search first, might need to change to binary search based on performance + for i := 0; i < len(parentVNS.splittVirtualNodeStores); i++ { + if hashValue >= parentVNS.splittVirtualNodeStores[i].adjustedLowerBound && hashValue < parentVNS.splittVirtualNodeStores[i].adjustedUpperBound { + return hashValue, ringId, parentVNS.splittVirtualNodeStores[i] + } + } + + klog.Errorf("Could not find node position in virtual node stores. HashValue %v, VS %s", hashValue, parentVNS.GetBoundaris()) + } + + return hashValue, ringId, parentVNS } func (ns *NodeStore) addNodeToRing(nodeEvent *node.ManagedNodeEvent) (isNewNode bool) { diff --git a/resource-management/pkg/distributor/storage/nodestore_test.go b/resource-management/pkg/distributor/storage/nodestore_test.go index 647b943..cc0a104 100644 --- a/resource-management/pkg/distributor/storage/nodestore_test.go +++ b/resource-management/pkg/distributor/storage/nodestore_test.go @@ -64,6 +64,8 @@ func TestAdjustCapacity_1stSplit(t *testing.T) { assert.Equal(t, vsPointerOld, vs) assert.Nil(t, vs.splittVirtualNodeStores[0].parentVirtualNodeStore) assert.Equal(t, vs, vs.splittVirtualNodeStores[1].parentVirtualNodeStore) + + assert.Equal(t, len(vs.nodeEventByHash), vs.GetHostNum()) } } } From 7bb30f943eba46b753f3aca38d27aeb703a3e2b6 Mon Sep 17 00:00:00 2001 From: Ying Huang Date: Fri, 14 Oct 2022 22:04:04 +0000 Subject: [PATCH 5/7] vNode split case 2 - further split and validation till all hosts are assigned --- .../pkg/distributor/distributor.go | 14 +- .../distributor_concurrency_test.go | 6 +- .../pkg/distributor/distributor_test.go | 97 ++++++----- .../pkg/distributor/storage/nodestore.go | 157 +++++++++++++++--- .../pkg/distributor/storage/nodestore_test.go | 13 +- 5 files changed, 203 insertions(+), 84 deletions(-) diff --git a/resource-management/pkg/distributor/distributor.go b/resource-management/pkg/distributor/distributor.go index 57d5d93..4b3082b 100644 --- a/resource-management/pkg/distributor/distributor.go +++ b/resource-management/pkg/distributor/distributor.go @@ -117,7 +117,7 @@ func (dis *ResourceDistributor) allocateNodesToClient(clientId string, requested allStores := dis.defaultNodeStore.GetVirtualStores() freeStores := make(map[*storage.VirtualNodeStore]bool) for _, vs := range *allStores { - if vs.GetAssignedClient() == "" && vs.GetHostNum() > 0 { + if vs.GetFreeHostNum() > 0 { freeStores[vs] = true } } @@ -133,18 +133,18 @@ func (dis *ResourceDistributor) allocateNodesToClient(clientId string, requested for i := 0; i < len(storesToSelectInorder); i++ { candidateStore := storesToSelectInorder[i] - newHostCount := assignedHostCount + candidateStore.GetHostNum() + newHostCount := assignedHostCount + candidateStore.GetFreeHostNum() if newHostCount <= requestedHostNum { - selectedStores = append(selectedStores, candidateStore) + selectedStores = append(selectedStores, candidateStore.GetAllFreeChildStores()...) assignedHostCount = newHostCount if newHostCount == requestedHostNum { hostAssignIsOK = true break } } else { // split vNode - candidateStore.AdjustCapacity(candidateStore, requestedHostNum-assignedHostCount) - selectedStores = append(selectedStores, candidateStore) - assignedHostCount = requestedHostNum + requestedStores := candidateStore.RequestCapacity(requestedHostNum - assignedHostCount) + selectedStores = append(selectedStores, requestedStores...) + assignedHostCount += candidateStore.GetHostNum() hostAssignIsOK = true break } @@ -302,7 +302,7 @@ func (dis *ResourceDistributor) persistVirtualNodesAssignment(clientId string, a vNodeToSave := &store.VirtualNodeConfig{ Location: s.GetLocation(), } - vNodeToSave.Lowerbound, vNodeToSave.Upperbound = s.GetRange() + vNodeToSave.Lowerbound, vNodeToSave.Upperbound = s.GetOriginalRange() vNodeConfigs[i] = vNodeToSave } assignment := &store.VirtualNodeAssignment{ diff --git a/resource-management/pkg/distributor/distributor_concurrency_test.go b/resource-management/pkg/distributor/distributor_concurrency_test.go index 9b2b6a6..12da569 100644 --- a/resource-management/pkg/distributor/distributor_concurrency_test.go +++ b/resource-management/pkg/distributor/distributor_concurrency_test.go @@ -45,21 +45,21 @@ func TestSingleRPMutipleClients_Workflow(t *testing.T) { updateEventNum: 1000, }, { - name: "Test 10K nodes with 5 clients each has 500 , each got 10K update events", + name: "Test 10K nodes with 5 clients each has 500 hosts, each got 10K update events", nodeNum: 10000, clientNum: 5, hostPerClient: 500, updateEventNum: 10000, }, { - name: "Test 10K nodes with 5 clients each has 500 , each got 100K update events", + name: "Test 10K nodes with 5 clients each has 500 hosts, each got 100K update events", nodeNum: 10000, clientNum: 5, hostPerClient: 500, updateEventNum: 100000, }, { - name: "Test 1M nodes with 50 clients each has 15000 , each got 100K update events", + name: "Test 1M nodes with 50 clients each has 15000 hosts, each got 100K update events", nodeNum: 1000000, clientNum: 50, hostPerClient: 15000, diff --git a/resource-management/pkg/distributor/distributor_test.go b/resource-management/pkg/distributor/distributor_test.go index bf3b6f6..dd9f6da 100644 --- a/resource-management/pkg/distributor/distributor_test.go +++ b/resource-management/pkg/distributor/distributor_test.go @@ -86,7 +86,7 @@ func TestDistributorInit(t *testing.T) { store := (*defaultNodeStores)[i] assert.Equal(t, 0, store.GetHostNum(), "Initial host number should be 0") assert.Equal(t, "", store.GetAssignedClient(), "Virtual store should not be assigned to any client") - lowerBound, upperBound := store.GetRange() + lowerBound, upperBound := store.GetOriginalRange() assert.Equal(t, lower, lowerBound, "Expecting lower bound %f but got %f. store id %d, hash range (%f, %f]", lower, lowerBound, i, lowerBound, upperBound) assert.NotEqual(t, lowerBound, upperBound, "Expecting lower bound not equal to upper bound for virtual store %d. Got hash range (%f, %f]", i, lowerBound, upperBound) lower = upperBound @@ -337,45 +337,68 @@ func TestRegisterClient_ErrorCases(t *testing.T) { } func TestRegisterClient_WithinLimit(t *testing.T) { - distributor := setUp() - defer tearDown() - - result, rvMap := distributor.ProcessEvents(generateAddNodeEvent(10000, defaultLocBeijing_RP1)) - assert.True(t, result) - assert.NotNil(t, rvMap) - assert.Equal(t, 10000, distributor.defaultNodeStore.GetTotalHostNum()) + for v := 2; v <= 500; v++ { + virutalStoreNumPerResourcePartition = v + distributor := setUp() + + totalHostCount := 10000 + result, rvMap := distributor.ProcessEvents(generateAddNodeEvent(totalHostCount, defaultLocBeijing_RP1)) + assert.True(t, result) + assert.NotNil(t, rvMap) + assert.Equal(t, totalHostCount, distributor.defaultNodeStore.GetTotalHostNum()) + + allHosts := make(map[string]bool, 0) + requestedHostNum := 500 + for i := 0; i < 20; i++ { + start := time.Now() + client := types.Client{ClientId: uuid.New().String(), Resource: types.ResourceRequest{TotalMachines: requestedHostNum}, ClientInfo: types.ClientInfoType{}} + err := distributor.RegisterClient(&client) + duration := time.Since(start) + + clientId := client.ClientId + assert.NotNil(t, clientId, "Expecting not nil client id") + assert.False(t, clientId == "", "Expecting non empty client id") + assert.Nil(t, err, "Expecting nil error") + + // check virtual node assignment + virtualStoresAssignedToClient, isOK := distributor.clientToStores[clientId] + assert.True(t, isOK, "Expecting get virtual stores assigned to client %s", clientId) + assert.True(t, len(virtualStoresAssignedToClient) > 0, "Expecting get non empty virtual stores assigned to client %s", clientId) + hostCount := 0 + for j := 0; j < len(virtualStoresAssignedToClient); j++ { + vs := virtualStoresAssignedToClient[j] + assert.Equal(t, clientId, vs.GetAssignedClient(), "Unexpected virtual store client id %s", clientId) + lower, upper := vs.GetAdjustedRange() + t.Logf("Virtual node store (%f, %f] is assigned to client %s, host number %d. vNodePerPR %v", lower, upper, clientId, vs.GetHostNum(), virutalStoreNumPerResourcePartition) + if !vs.IsValidTopVirtualNodeStore() { + t.Fatalf("Invalid vNode store for client %v", clientId) + } + hostCount += vs.GetHostNum() + } + t.Logf("Total %d hosts are assigned to client %s\nTook %v to register the client.\n", hostCount, clientId, duration) + if requestedHostNum != hostCount { + assert.Equal(t, requestedHostNum, hostCount, "Assigned host number %d is not equal to requested %d", hostCount, requestedHostNum) + t.Fatalf("virutalStoreNumPerResourcePartition = %d, client count %v", virutalStoreNumPerResourcePartition, i) + } - requestedHostNum := 500 - for i := 0; i < 10; i++ { - start := time.Now() - client := types.Client{ClientId: uuid.New().String(), Resource: types.ResourceRequest{TotalMachines: requestedHostNum}, ClientInfo: types.ClientInfoType{}} - err := distributor.RegisterClient(&client) - duration := time.Since(start) - - clientId := client.ClientId - assert.NotNil(t, clientId, "Expecting not nil client id") - assert.False(t, clientId == "", "Expecting non empty client id") - assert.Nil(t, err, "Expecting nil error") - - // check virtual node assignment - virtualStoresAssignedToClient, isOK := distributor.clientToStores[clientId] - assert.True(t, isOK, "Expecting get virtual stores assigned to client %s", clientId) - assert.True(t, len(virtualStoresAssignedToClient) > 0, "Expecting get non empty virtual stores assigned to client %s", clientId) - hostCount := 0 - for i := 0; i < len(virtualStoresAssignedToClient); i++ { - vs := virtualStoresAssignedToClient[i] - assert.Equal(t, clientId, vs.GetAssignedClient(), "Unexpected virtual store client id %s", clientId) - lower, upper := vs.GetRange() - t.Logf("Virtual node store (%f, %f] is assigned to client %s, host number %d\n", lower, upper, clientId, vs.GetHostNum()) - hostCount += vs.GetHostNum() + // check nodes number with list nodes + nodes, _, err := distributor.ListNodesForClient(clientId) + assert.Nil(t, err, "List nodes by client id should be successful") + assert.Equal(t, hostCount, len(nodes), "Node count from virtual store should be same as list nodes") + + // check node allocation and make sure there is no duplicates + for _, n := range nodes { + if _, isOK := allHosts[n.Id]; isOK { + t.Fatalf("Host was assigned to another client but reassigned again, id %v", n.Id) + } else { + allHosts[n.Id] = true + } + } } - t.Logf("Total %d hosts are assigned to client %s\nTook %v to register the client.\n", hostCount, clientId, duration) - assert.Equal(t, requestedHostNum, hostCount, "Assigned host number %d is less than requested %d", hostCount, requestedHostNum) - // check nodes number with list nodes - nodes, _, err := distributor.ListNodesForClient(clientId) - assert.Nil(t, err, "List nodes by client id should be successful") - assert.Equal(t, hostCount, len(nodes), "Node count from virtual store should be same as list nodes") + assert.Equal(t, totalHostCount, len(allHosts)) + + tearDown() } } diff --git a/resource-management/pkg/distributor/storage/nodestore.go b/resource-management/pkg/distributor/storage/nodestore.go index 7bf1da9..b78c56f 100644 --- a/resource-management/pkg/distributor/storage/nodestore.go +++ b/resource-management/pkg/distributor/storage/nodestore.go @@ -78,6 +78,82 @@ func (vs *VirtualNodeStore) GetHostNum() int { return len(vs.nodeEventByHash) } +func (vs *VirtualNodeStore) GetFreeHostNum() int { + if len(vs.splittVirtualNodeStores) > 0 { + freeHostCount := 0 + for i := 0; i < len(vs.splittVirtualNodeStores); i++ { + if !vs.splittVirtualNodeStores[i].IsAssignedToClient() { + vs.splittVirtualNodeStores[i].mu.RLock() + freeHostCount += len(vs.splittVirtualNodeStores[i].nodeEventByHash) + vs.splittVirtualNodeStores[i].mu.RUnlock() + } + } + return freeHostCount + } else if vs.IsAssignedToClient() { + return 0 + } else { + return vs.GetHostNum() + } +} + +func (vs *VirtualNodeStore) GetAllFreeChildStores() []*VirtualNodeStore { + if len(vs.splittVirtualNodeStores) > 0 { + freeStores := make([]*VirtualNodeStore, 0) + for i := 0; i < len(vs.splittVirtualNodeStores); i++ { + if !vs.splittVirtualNodeStores[i].IsAssignedToClient() { + freeStores = append(freeStores, vs.splittVirtualNodeStores[i]) + } + } + return freeStores + } else if vs.IsAssignedToClient() { + return nil + } else { + return []*VirtualNodeStore{vs} + } +} + +func (vs *VirtualNodeStore) IsValidTopVirtualNodeStore() bool { + if vs.parentVirtualNodeStore != nil { + return false + } + + storeCount := len(vs.splittVirtualNodeStores) + if storeCount == 0 { + return true + } else if storeCount == 1 { + return false + } + + // check lower/upperbound of top vNode + if vs.lowerbound != vs.splittVirtualNodeStores[0].adjustedLowerBound || vs.upperbound != vs.splittVirtualNodeStores[storeCount-1].adjustedUpperBound { + return false + } + // check adjusted lower/upperbound + if vs.splittVirtualNodeStores[0].adjustedLowerBound >= vs.splittVirtualNodeStores[0].adjustedUpperBound { + return false + } + isParentVNodeFound := false + previousUpperBound := vs.splittVirtualNodeStores[0].adjustedUpperBound + for i := 0; i < storeCount; i++ { + currentVNode := vs.splittVirtualNodeStores[i] + if currentVNode.parentVirtualNodeStore == nil { + if !isParentVNodeFound { + isParentVNodeFound = true + } else { + return false + } + if i > 0 { + if previousUpperBound != currentVNode.adjustedLowerBound || currentVNode.adjustedLowerBound >= currentVNode.adjustedUpperBound { + return false + } + previousUpperBound = currentVNode.adjustedUpperBound + } + } + } + + return true +} + func (vs *VirtualNodeStore) GetLocation() location.Location { return vs.location } @@ -86,6 +162,13 @@ func (vs *VirtualNodeStore) GetAssignedClient() string { return vs.clientId } +func (vs *VirtualNodeStore) IsAssignedToClient() bool { + if vs.clientId != "" { + return true + } + return false +} + func (vs *VirtualNodeStore) AssignToClient(clientId string, eventQueue *cache.NodeEventQueue) bool { if vs.clientId != "" { return false @@ -104,10 +187,17 @@ func (vs *VirtualNodeStore) Release() { vs.clientId = "" } -func (vs *VirtualNodeStore) GetRange() (float64, float64) { +func (vs *VirtualNodeStore) GetOriginalRange() (float64, float64) { return vs.lowerbound, vs.upperbound } +func (vs *VirtualNodeStore) GetAdjustedRange() (float64, float64) { + if vs.parentVirtualNodeStore == nil && len(vs.splittVirtualNodeStores) == 0 { // top level vNode never being splitted + return vs.GetOriginalRange() + } + return vs.adjustedLowerBound, vs.adjustedUpperBound +} + // Snapshot generates a list of node for the List() call from a client, and a current RV map to client func (vs *VirtualNodeStore) SnapShot() ([]*types.LogicalNode, types.TransitResourceVersionMap) { vs.mu.RLock() @@ -146,30 +236,25 @@ func (vs *VirtualNodeStore) GenerateBookmarkEvent() *node.ManagedNodeEvent { // Input requested host count // TODO: error cases -func (vs *VirtualNodeStore) AdjustCapacity(vNStoAdjust *VirtualNodeStore, requestedHostCount int) { - vs.mu.Lock() - defer vs.mu.Unlock() +func (vs *VirtualNodeStore) RequestCapacity(requestedHostCount int) []*VirtualNodeStore { + vs.mu.RLock() + defer vs.mu.RUnlock() - currentSize := len(vNStoAdjust.nodeEventByHash) - if currentSize <= requestedHostCount { - klog.Errorf("Requested to split virtual node when capacity (%d) is less than or equal to requested host count (%d)", currentSize, requestedHostCount) - return + freeHostNum := vs.GetFreeHostNum() + if freeHostNum <= requestedHostCount { + klog.Errorf("Free host count (%d) is less than or equal to requested host count (%d)", freeHostNum, requestedHostCount) + return nil } if vs.parentVirtualNodeStore != nil { - klog.Error("VS capacity adjustment can only be done from parent node") - return + klog.Error("Request free host can only be done from parent node") + return nil } if len(vs.splittVirtualNodeStores) == 0 { // first splitted vs - if vs != vNStoAdjust { - klog.Error("Invalid virtual node status: first time split with unmatched virtual node stores") - return - } - // create new virtual node store newVSStore := &VirtualNodeStore{ mu: sync.RWMutex{}, - nodeEventByHash: make(map[float64]*node.ManagedNodeEvent, currentSize-requestedHostCount), + nodeEventByHash: make(map[float64]*node.ManagedNodeEvent, freeHostNum-requestedHostCount), location: vs.location, parentVirtualNodeStore: vs, adjustedLowerBound: vs.lowerbound, @@ -177,23 +262,43 @@ func (vs *VirtualNodeStore) AdjustCapacity(vNStoAdjust *VirtualNodeStore, reques } vs.adjustedLowerBound = vs.lowerbound vs.adjustedUpperBound = vs.upperbound - vs.moveNodes(vs, newVSStore, currentSize-requestedHostCount, true) + vs.moveNodes(vs, newVSStore, freeHostNum-requestedHostCount, true) // Add new virtual node into store vs.splittVirtualNodeStores = make([]*VirtualNodeStore, 2) vs.splittVirtualNodeStores[0] = vs vs.splittVirtualNodeStores[1] = newVSStore + return []*VirtualNodeStore{vs} } else { - currentIndex := vs.findIndexOfVS(vNStoAdjust) - if currentIndex < 0 { - klog.Error("Invalid VirtualNodeStore config") - return + freeStores := vs.GetAllFreeChildStores() + candidateStores := make([]*VirtualNodeStore, 0) + count := 0 + for i := 0; i < len(freeStores); i++ { + newHostCount := count + freeStores[i].GetHostNum() + if newHostCount == requestedHostCount { + return append(candidateStores, freeStores[i]) + } else if newHostCount < requestedHostCount { + candidateStores = append(candidateStores, freeStores[i]) + } else { + newVSStore := &VirtualNodeStore{ + mu: sync.RWMutex{}, + nodeEventByHash: make(map[float64]*node.ManagedNodeEvent, newHostCount-requestedHostCount), + location: vs.location, + parentVirtualNodeStore: vs, + adjustedLowerBound: freeStores[i].adjustedLowerBound, + adjustedUpperBound: freeStores[i].adjustedUpperBound, + } + vs.moveNodes(freeStores[i], newVSStore, newHostCount-requestedHostCount, true) + + vs.splittVirtualNodeStores = append(vs.splittVirtualNodeStores, newVSStore) + sort.Sort(vs) + return append(candidateStores, freeStores[i]) + } } - newVSStore, countFromHighEnd := vs.findAdjacentVacantStore(currentIndex) - vs.moveNodes(vNStoAdjust, newVSStore, currentSize-requestedHostCount, countFromHighEnd) - // sort splittVirtualNodeStores by range - sort.Sort(vs) + // this should never be reached + klog.Error("Unexpected statement reach. Free host #: %v, requested host #: %v", freeHostNum, requestedHostCount) + return candidateStores } } @@ -427,7 +532,7 @@ func (ns *NodeStore) CheckFreeCapacity(requestedHostNum int) bool { defer ns.nsLock.Unlock() allocatableHostNum := 0 for _, vs := range *ns.vNodeStores { - allocatableHostNum += vs.GetHostNum() + allocatableHostNum += vs.GetFreeHostNum() if allocatableHostNum >= requestedHostNum { return true } diff --git a/resource-management/pkg/distributor/storage/nodestore_test.go b/resource-management/pkg/distributor/storage/nodestore_test.go index cc0a104..6f4ebe4 100644 --- a/resource-management/pkg/distributor/storage/nodestore_test.go +++ b/resource-management/pkg/distributor/storage/nodestore_test.go @@ -38,33 +38,24 @@ func TestAdjustCapacity_1stSplit(t *testing.T) { upperBoundOld := vs.upperbound vsPointerOld := vs - vs.AdjustCapacity(vs, j) + vs.RequestCapacity(j) // check result assert.Nil(t, vs.parentVirtualNodeStore) assert.Equal(t, j, len(vs.nodeEventByHash)) + assert.True(t, vs.IsValidTopVirtualNodeStore()) assert.Equal(t, 2, len(vs.splittVirtualNodeStores)) assert.NotEqual(t, vs.splittVirtualNodeStores[0], vs.splittVirtualNodeStores[1]) assert.Equal(t, i, len(vs.splittVirtualNodeStores[0].nodeEventByHash)+len(vs.splittVirtualNodeStores[1].nodeEventByHash)) - assert.True(t, vs.splittVirtualNodeStores[0].adjustedLowerBound < vs.splittVirtualNodeStores[1].adjustedLowerBound) - assert.True(t, vs.splittVirtualNodeStores[0].adjustedUpperBound < vs.splittVirtualNodeStores[1].adjustedUpperBound) - assert.Equal(t, vs.splittVirtualNodeStores[0].adjustedUpperBound, vs.splittVirtualNodeStores[1].adjustedLowerBound) - // init vs has immutable lower & upper bound assert.Equal(t, lowerBoundOld, vs.lowerbound) assert.Equal(t, upperBoundOld, vs.upperbound) - assert.Equal(t, vs.lowerbound, vs.splittVirtualNodeStores[0].adjustedLowerBound) - assert.Equal(t, vs.upperbound, vs.splittVirtualNodeStores[1].adjustedUpperBound) vs.mu.Lock() vs.mu.Unlock() - // has one and only one store as parent assert.Equal(t, vsPointerOld, vs) - assert.Nil(t, vs.splittVirtualNodeStores[0].parentVirtualNodeStore) - assert.Equal(t, vs, vs.splittVirtualNodeStores[1].parentVirtualNodeStore) - assert.Equal(t, len(vs.nodeEventByHash), vs.GetHostNum()) } } From 59849ee961e047b3ef571cb288a76394dc4942f8 Mon Sep 17 00:00:00 2001 From: Ying Huang Date: Mon, 17 Oct 2022 16:34:28 +0000 Subject: [PATCH 6/7] Validate all assigned virtual stores --- .../pkg/distributor/distributor_test.go | 6 ++++-- .../pkg/distributor/storage/nodestore.go | 19 +++++++++++-------- .../pkg/distributor/storage/nodestore_test.go | 2 +- 3 files changed, 16 insertions(+), 11 deletions(-) diff --git a/resource-management/pkg/distributor/distributor_test.go b/resource-management/pkg/distributor/distributor_test.go index dd9f6da..8a8dfe6 100644 --- a/resource-management/pkg/distributor/distributor_test.go +++ b/resource-management/pkg/distributor/distributor_test.go @@ -337,7 +337,9 @@ func TestRegisterClient_ErrorCases(t *testing.T) { } func TestRegisterClient_WithinLimit(t *testing.T) { - for v := 2; v <= 500; v++ { + vNodeNumberPerRPToTest := []int{2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 50, 51, 100, 101, 200, 201, 500, 501} + for _, v := range vNodeNumberPerRPToTest { + t.Logf("Test register client with virutalStoreNumPerResourcePartition = %d", v) virutalStoreNumPerResourcePartition = v distributor := setUp() @@ -370,7 +372,7 @@ func TestRegisterClient_WithinLimit(t *testing.T) { assert.Equal(t, clientId, vs.GetAssignedClient(), "Unexpected virtual store client id %s", clientId) lower, upper := vs.GetAdjustedRange() t.Logf("Virtual node store (%f, %f] is assigned to client %s, host number %d. vNodePerPR %v", lower, upper, clientId, vs.GetHostNum(), virutalStoreNumPerResourcePartition) - if !vs.IsValidTopVirtualNodeStore() { + if !vs.IsValidVirtualNodeStore() { t.Fatalf("Invalid vNode store for client %v", clientId) } hostCount += vs.GetHostNum() diff --git a/resource-management/pkg/distributor/storage/nodestore.go b/resource-management/pkg/distributor/storage/nodestore.go index b78c56f..45b455a 100644 --- a/resource-management/pkg/distributor/storage/nodestore.go +++ b/resource-management/pkg/distributor/storage/nodestore.go @@ -112,9 +112,12 @@ func (vs *VirtualNodeStore) GetAllFreeChildStores() []*VirtualNodeStore { } } -func (vs *VirtualNodeStore) IsValidTopVirtualNodeStore() bool { +func (vs *VirtualNodeStore) IsValidVirtualNodeStore() bool { if vs.parentVirtualNodeStore != nil { - return false + if vs.parentVirtualNodeStore.parentVirtualNodeStore != nil { // Only two levels are allowed + return false + } + return vs.parentVirtualNodeStore.IsValidVirtualNodeStore() } storeCount := len(vs.splittVirtualNodeStores) @@ -142,16 +145,16 @@ func (vs *VirtualNodeStore) IsValidTopVirtualNodeStore() bool { } else { return false } - if i > 0 { - if previousUpperBound != currentVNode.adjustedLowerBound || currentVNode.adjustedLowerBound >= currentVNode.adjustedUpperBound { - return false - } - previousUpperBound = currentVNode.adjustedUpperBound + } + if i > 0 { + if previousUpperBound != currentVNode.adjustedLowerBound || currentVNode.adjustedLowerBound >= currentVNode.adjustedUpperBound { + return false } + previousUpperBound = currentVNode.adjustedUpperBound } } - return true + return isParentVNodeFound } func (vs *VirtualNodeStore) GetLocation() location.Location { diff --git a/resource-management/pkg/distributor/storage/nodestore_test.go b/resource-management/pkg/distributor/storage/nodestore_test.go index 6f4ebe4..8f25ed6 100644 --- a/resource-management/pkg/distributor/storage/nodestore_test.go +++ b/resource-management/pkg/distributor/storage/nodestore_test.go @@ -43,7 +43,7 @@ func TestAdjustCapacity_1stSplit(t *testing.T) { // check result assert.Nil(t, vs.parentVirtualNodeStore) assert.Equal(t, j, len(vs.nodeEventByHash)) - assert.True(t, vs.IsValidTopVirtualNodeStore()) + assert.True(t, vs.IsValidVirtualNodeStore()) assert.Equal(t, 2, len(vs.splittVirtualNodeStores)) assert.NotEqual(t, vs.splittVirtualNodeStores[0], vs.splittVirtualNodeStores[1]) From 49fccc306cdb79b74631b09978f4dd3afe752926 Mon Sep 17 00:00:00 2001 From: Ying Huang Date: Tue, 18 Oct 2022 17:23:28 +0000 Subject: [PATCH 7/7] Update UTs --- .../distributor_concurrency_test.go | 22 +++++++++---------- .../pkg/distributor/distributor_test.go | 2 +- 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/resource-management/pkg/distributor/distributor_concurrency_test.go b/resource-management/pkg/distributor/distributor_concurrency_test.go index 12da569..2a6c32c 100644 --- a/resource-management/pkg/distributor/distributor_concurrency_test.go +++ b/resource-management/pkg/distributor/distributor_concurrency_test.go @@ -52,17 +52,17 @@ func TestSingleRPMutipleClients_Workflow(t *testing.T) { updateEventNum: 10000, }, { - name: "Test 10K nodes with 5 clients each has 500 hosts, each got 100K update events", + name: "Test 10K nodes with 20 clients each has 500 hosts, each got 100K update events", nodeNum: 10000, - clientNum: 5, + clientNum: 20, hostPerClient: 500, updateEventNum: 100000, }, { - name: "Test 1M nodes with 50 clients each has 15000 hosts, each got 100K update events", + name: "Test 1M nodes with 50 clients each has 20000 hosts, each got 100K update events", nodeNum: 1000000, clientNum: 50, - hostPerClient: 15000, + hostPerClient: 20000, updateEventNum: 100000, }, } @@ -217,13 +217,13 @@ func TestMultipleRPsMutipleClients_Workflow(t *testing.T) { updateEventNum: 10000, }, { - name: "Test 6 region, each has 20 RPs, 40K hosts per RP, 4.8M nodes with 200 clients, each got 20K hosts, 20K update events", - regionNum: 6, - rpNum: 20, - hostPerRP: 40000, - clientNum: 200, - hostPerClient: 20000, - updateEventNum: 20000, + name: "Test 5 region, each has 40 RPs, 25K hosts per RP, 5M nodes with 100 clients, each got 50K hosts, 50K update events", + regionNum: 5, + rpNum: 40, + hostPerRP: 25000, + clientNum: 100, + hostPerClient: 50000, + updateEventNum: 50000, }, } diff --git a/resource-management/pkg/distributor/distributor_test.go b/resource-management/pkg/distributor/distributor_test.go index 8a8dfe6..993d6b8 100644 --- a/resource-management/pkg/distributor/distributor_test.go +++ b/resource-management/pkg/distributor/distributor_test.go @@ -570,7 +570,7 @@ func TestRegistration5MCase(t *testing.T) { } wg.Wait() - assert.Equal(t, 1, totalErrors) + assert.Equal(t, 0, totalErrors) t.Logf("%s succeed", tt.name) }) }