diff --git a/resource-management/pkg/distributor/distributor.go b/resource-management/pkg/distributor/distributor.go index 5974432..4b3082b 100644 --- a/resource-management/pkg/distributor/distributor.go +++ b/resource-management/pkg/distributor/distributor.go @@ -117,7 +117,7 @@ func (dis *ResourceDistributor) allocateNodesToClient(clientId string, requested allStores := dis.defaultNodeStore.GetVirtualStores() freeStores := make(map[*storage.VirtualNodeStore]bool) for _, vs := range *allStores { - if vs.GetAssignedClient() == "" && vs.GetHostNum() > 0 { + if vs.GetFreeHostNum() > 0 { freeStores[vs] = true } } @@ -131,9 +131,20 @@ func (dis *ResourceDistributor) allocateNodesToClient(clientId string, requested assignedHostCount := 0 hostAssignIsOK := false for i := 0; i < len(storesToSelectInorder); i++ { - selectedStores = append(selectedStores, storesToSelectInorder[i]) - assignedHostCount += (*storesToSelectInorder[i]).GetHostNum() - if assignedHostCount >= requestedHostNum { + candidateStore := storesToSelectInorder[i] + + newHostCount := assignedHostCount + candidateStore.GetFreeHostNum() + if newHostCount <= requestedHostNum { + selectedStores = append(selectedStores, candidateStore.GetAllFreeChildStores()...) + assignedHostCount = newHostCount + if newHostCount == requestedHostNum { + hostAssignIsOK = true + break + } + } else { // split vNode + requestedStores := candidateStore.RequestCapacity(requestedHostNum - assignedHostCount) + selectedStores = append(selectedStores, requestedStores...) + assignedHostCount += candidateStore.GetHostNum() hostAssignIsOK = true break } @@ -291,7 +302,7 @@ func (dis *ResourceDistributor) persistVirtualNodesAssignment(clientId string, a vNodeToSave := &store.VirtualNodeConfig{ Location: s.GetLocation(), } - vNodeToSave.Lowerbound, vNodeToSave.Upperbound = s.GetRange() + vNodeToSave.Lowerbound, vNodeToSave.Upperbound = s.GetOriginalRange() vNodeConfigs[i] = vNodeToSave } assignment := &store.VirtualNodeAssignment{ diff --git a/resource-management/pkg/distributor/distributor_concurrency_test.go b/resource-management/pkg/distributor/distributor_concurrency_test.go index 82ffdd5..2a6c32c 100644 --- a/resource-management/pkg/distributor/distributor_concurrency_test.go +++ b/resource-management/pkg/distributor/distributor_concurrency_test.go @@ -45,24 +45,24 @@ func TestSingleRPMutipleClients_Workflow(t *testing.T) { updateEventNum: 1000, }, { - name: "Test 10K nodes with 5 clients each has 500 , each got 10K update events", + name: "Test 10K nodes with 5 clients each has 500 hosts, each got 10K update events", nodeNum: 10000, clientNum: 5, hostPerClient: 500, updateEventNum: 10000, }, { - name: "Test 10K nodes with 5 clients each has 500 , each got 100K update events", + name: "Test 10K nodes with 20 clients each has 500 hosts, each got 100K update events", nodeNum: 10000, - clientNum: 5, + clientNum: 20, hostPerClient: 500, updateEventNum: 100000, }, { - name: "Test 1M nodes with 50 clients each has 15000 , each got 100K update events", + name: "Test 1M nodes with 50 clients each has 20000 hosts, each got 100K update events", nodeNum: 1000000, clientNum: 50, - hostPerClient: 15000, + hostPerClient: 20000, updateEventNum: 100000, }, } @@ -112,7 +112,7 @@ func TestSingleRPMutipleClients_Workflow(t *testing.T) { assert.Nil(t, err) assert.NotNil(t, latestRVs) - assert.True(t, len(nodes) >= tt.hostPerClient) + assert.Equal(t, tt.hostPerClient, len(nodes)) // t.Logf("Client %d %s latest rvs: %v.Total hosts: %d\n", i, clientId, latestRVs, len(nodes)) latestRVsByClient[i] = latestRVs nodesByClient[i] = nodes @@ -217,13 +217,13 @@ func TestMultipleRPsMutipleClients_Workflow(t *testing.T) { updateEventNum: 10000, }, { - name: "Test 6 region, each has 20 RPs, 40K hosts per RP, 4.8M nodes with 200 clients, each got 20K hosts, 20K update events", - regionNum: 6, - rpNum: 20, - hostPerRP: 40000, - clientNum: 200, - hostPerClient: 20000, - updateEventNum: 20000, + name: "Test 5 region, each has 40 RPs, 25K hosts per RP, 5M nodes with 100 clients, each got 50K hosts, 50K update events", + regionNum: 5, + rpNum: 40, + hostPerRP: 25000, + clientNum: 100, + hostPerClient: 50000, + updateEventNum: 50000, }, } @@ -297,7 +297,7 @@ func TestMultipleRPsMutipleClients_Workflow(t *testing.T) { assert.Nil(t, err) assert.NotNil(t, latestRVs) - assert.True(t, len(nodes) >= tt.hostPerClient) + assert.Equal(t, tt.hostPerClient, len(nodes)) // t.Logf("Client %d %s latest rvs: %v.Total hosts: %d\n", i, clientId, latestRVs, len(nodes)) latestRVsByClient[i] = latestRVs nodesByClient[i] = nodes diff --git a/resource-management/pkg/distributor/distributor_test.go b/resource-management/pkg/distributor/distributor_test.go index 1d44c7e..993d6b8 100644 --- a/resource-management/pkg/distributor/distributor_test.go +++ b/resource-management/pkg/distributor/distributor_test.go @@ -86,7 +86,7 @@ func TestDistributorInit(t *testing.T) { store := (*defaultNodeStores)[i] assert.Equal(t, 0, store.GetHostNum(), "Initial host number should be 0") assert.Equal(t, "", store.GetAssignedClient(), "Virtual store should not be assigned to any client") - lowerBound, upperBound := store.GetRange() + lowerBound, upperBound := store.GetOriginalRange() assert.Equal(t, lower, lowerBound, "Expecting lower bound %f but got %f. store id %d, hash range (%f, %f]", lower, lowerBound, i, lowerBound, upperBound) assert.NotEqual(t, lowerBound, upperBound, "Expecting lower bound not equal to upper bound for virtual store %d. Got hash range (%f, %f]", i, lowerBound, upperBound) lower = upperBound @@ -337,45 +337,70 @@ func TestRegisterClient_ErrorCases(t *testing.T) { } func TestRegisterClient_WithinLimit(t *testing.T) { - distributor := setUp() - defer tearDown() - - result, rvMap := distributor.ProcessEvents(generateAddNodeEvent(10000, defaultLocBeijing_RP1)) - assert.True(t, result) - assert.NotNil(t, rvMap) - assert.Equal(t, 10000, distributor.defaultNodeStore.GetTotalHostNum()) + vNodeNumberPerRPToTest := []int{2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 50, 51, 100, 101, 200, 201, 500, 501} + for _, v := range vNodeNumberPerRPToTest { + t.Logf("Test register client with virutalStoreNumPerResourcePartition = %d", v) + virutalStoreNumPerResourcePartition = v + distributor := setUp() + + totalHostCount := 10000 + result, rvMap := distributor.ProcessEvents(generateAddNodeEvent(totalHostCount, defaultLocBeijing_RP1)) + assert.True(t, result) + assert.NotNil(t, rvMap) + assert.Equal(t, totalHostCount, distributor.defaultNodeStore.GetTotalHostNum()) + + allHosts := make(map[string]bool, 0) + requestedHostNum := 500 + for i := 0; i < 20; i++ { + start := time.Now() + client := types.Client{ClientId: uuid.New().String(), Resource: types.ResourceRequest{TotalMachines: requestedHostNum}, ClientInfo: types.ClientInfoType{}} + err := distributor.RegisterClient(&client) + duration := time.Since(start) + + clientId := client.ClientId + assert.NotNil(t, clientId, "Expecting not nil client id") + assert.False(t, clientId == "", "Expecting non empty client id") + assert.Nil(t, err, "Expecting nil error") + + // check virtual node assignment + virtualStoresAssignedToClient, isOK := distributor.clientToStores[clientId] + assert.True(t, isOK, "Expecting get virtual stores assigned to client %s", clientId) + assert.True(t, len(virtualStoresAssignedToClient) > 0, "Expecting get non empty virtual stores assigned to client %s", clientId) + hostCount := 0 + for j := 0; j < len(virtualStoresAssignedToClient); j++ { + vs := virtualStoresAssignedToClient[j] + assert.Equal(t, clientId, vs.GetAssignedClient(), "Unexpected virtual store client id %s", clientId) + lower, upper := vs.GetAdjustedRange() + t.Logf("Virtual node store (%f, %f] is assigned to client %s, host number %d. vNodePerPR %v", lower, upper, clientId, vs.GetHostNum(), virutalStoreNumPerResourcePartition) + if !vs.IsValidVirtualNodeStore() { + t.Fatalf("Invalid vNode store for client %v", clientId) + } + hostCount += vs.GetHostNum() + } + t.Logf("Total %d hosts are assigned to client %s\nTook %v to register the client.\n", hostCount, clientId, duration) + if requestedHostNum != hostCount { + assert.Equal(t, requestedHostNum, hostCount, "Assigned host number %d is not equal to requested %d", hostCount, requestedHostNum) + t.Fatalf("virutalStoreNumPerResourcePartition = %d, client count %v", virutalStoreNumPerResourcePartition, i) + } - requestedHostNum := 500 - for i := 0; i < 10; i++ { - start := time.Now() - client := types.Client{ClientId: uuid.New().String(), Resource: types.ResourceRequest{TotalMachines: requestedHostNum}, ClientInfo: types.ClientInfoType{}} - err := distributor.RegisterClient(&client) - duration := time.Since(start) - - clientId := client.ClientId - assert.NotNil(t, clientId, "Expecting not nil client id") - assert.False(t, clientId == "", "Expecting non empty client id") - assert.Nil(t, err, "Expecting nil error") - - // check virtual node assignment - virtualStoresAssignedToClient, isOK := distributor.clientToStores[clientId] - assert.True(t, isOK, "Expecting get virtual stores assigned to client %s", clientId) - assert.True(t, len(virtualStoresAssignedToClient) > 0, "Expecting get non empty virtual stores assigned to client %s", clientId) - hostCount := 0 - for i := 0; i < len(virtualStoresAssignedToClient); i++ { - vs := virtualStoresAssignedToClient[i] - assert.Equal(t, clientId, vs.GetAssignedClient(), "Unexpected virtual store client id %s", clientId) - lower, upper := vs.GetRange() - t.Logf("Virtual node store (%f, %f] is assigned to client %s, host number %d\n", lower, upper, clientId, vs.GetHostNum()) - hostCount += vs.GetHostNum() + // check nodes number with list nodes + nodes, _, err := distributor.ListNodesForClient(clientId) + assert.Nil(t, err, "List nodes by client id should be successful") + assert.Equal(t, hostCount, len(nodes), "Node count from virtual store should be same as list nodes") + + // check node allocation and make sure there is no duplicates + for _, n := range nodes { + if _, isOK := allHosts[n.Id]; isOK { + t.Fatalf("Host was assigned to another client but reassigned again, id %v", n.Id) + } else { + allHosts[n.Id] = true + } + } } - t.Logf("Total %d hosts are assigned to client %s\nTook %v to register the client.\n", hostCount, clientId, duration) - assert.True(t, hostCount >= requestedHostNum, "Assigned host number %d is less than requested %d", hostCount, requestedHostNum) - // check nodes number with list nodes - nodes, _, err := distributor.ListNodesForClient(clientId) - assert.Nil(t, err, "List nodes by client id should be successful") - assert.Equal(t, hostCount, len(nodes), "Node count from virtual store should be same as list nodes") + assert.Equal(t, totalHostCount, len(allHosts)) + + tearDown() } } @@ -411,7 +436,7 @@ func TestRegistrationWorkflow(t *testing.T) { nodes, latestRVs, err := distributor.ListNodesForClient(clientId) assert.Nil(t, err) assert.NotNil(t, latestRVs) - assert.True(t, len(nodes) >= 500) + assert.Equal(t, requestedHostNum, len(nodes)) t.Logf("Latest rvs: %v. Total hosts: %d\n", latestRVs, len(nodes)) // check each node event nodeIds := make(map[string]bool) @@ -536,7 +561,7 @@ func TestRegistration5MCase(t *testing.T) { assert.Nil(t, err2) assert.NotNil(t, latestRVs) - assert.True(t, len(nodes) >= tt.hostPerScheduler) + assert.Equal(t, tt.hostPerScheduler, len(nodes)) klog.Infof("List nodes for client %s took %v", clientId, duration) } @@ -545,7 +570,7 @@ func TestRegistration5MCase(t *testing.T) { } wg.Wait() - assert.Equal(t, 1, totalErrors) + assert.Equal(t, 0, totalErrors) t.Logf("%s succeed", tt.name) }) } @@ -576,7 +601,7 @@ func TestWatchRenewal(t *testing.T) { nodes, latestRVs, err := distributor.ListNodesForClient(clientId) assert.Nil(t, err) assert.NotNil(t, latestRVs) - assert.True(t, len(nodes) >= 500) + assert.Equal(t, requestedHostNum, len(nodes)) t.Logf("Latest rvs: %v. Total hosts: %d\n", latestRVs, len(nodes)) // check each node event nodeIds := make(map[string]bool) diff --git a/resource-management/pkg/distributor/storage/nodestore.go b/resource-management/pkg/distributor/storage/nodestore.go index b15d4c6..45b455a 100644 --- a/resource-management/pkg/distributor/storage/nodestore.go +++ b/resource-management/pkg/distributor/storage/nodestore.go @@ -17,8 +17,11 @@ limitations under the License. package storage import ( + "bytes" + "fmt" "k8s.io/klog/v2" "math" + "sort" "sync" "global-resource-service/resource-management/pkg/common-lib/hash" @@ -35,16 +38,37 @@ const ( BatchPersistSize = 100 ) +// Maximal two levels: +// 1. Original VS +// 2. Splitted VirtualNodeStores type VirtualNodeStore struct { - mu sync.RWMutex + mu sync.RWMutex + + // node events belong to current VS nodeEventByHash map[float64]*node.ManagedNodeEvent - lowerbound float64 - upperbound float64 + // lower bound - inclusive - immutable for top level VS, not used for child vs + lowerbound float64 + // upper bound - exclusive - immutable for top level VS, not used for child vs + upperbound float64 + + // effective lower bound + adjustedLowerBound float64 + // effective upper bound + adjustedUpperBound float64 + + // empty for child VS + splittVirtualNodeStores []*VirtualNodeStore + + // nil for top level VS + parentVirtualNodeStore *VirtualNodeStore // one virtual store can only have nodes from one resource partition location location.Location - clientId string + // client current VS is assigned to + clientId string + + // event queue that send node events to eventQueue *cache.NodeEventQueue } @@ -54,6 +78,85 @@ func (vs *VirtualNodeStore) GetHostNum() int { return len(vs.nodeEventByHash) } +func (vs *VirtualNodeStore) GetFreeHostNum() int { + if len(vs.splittVirtualNodeStores) > 0 { + freeHostCount := 0 + for i := 0; i < len(vs.splittVirtualNodeStores); i++ { + if !vs.splittVirtualNodeStores[i].IsAssignedToClient() { + vs.splittVirtualNodeStores[i].mu.RLock() + freeHostCount += len(vs.splittVirtualNodeStores[i].nodeEventByHash) + vs.splittVirtualNodeStores[i].mu.RUnlock() + } + } + return freeHostCount + } else if vs.IsAssignedToClient() { + return 0 + } else { + return vs.GetHostNum() + } +} + +func (vs *VirtualNodeStore) GetAllFreeChildStores() []*VirtualNodeStore { + if len(vs.splittVirtualNodeStores) > 0 { + freeStores := make([]*VirtualNodeStore, 0) + for i := 0; i < len(vs.splittVirtualNodeStores); i++ { + if !vs.splittVirtualNodeStores[i].IsAssignedToClient() { + freeStores = append(freeStores, vs.splittVirtualNodeStores[i]) + } + } + return freeStores + } else if vs.IsAssignedToClient() { + return nil + } else { + return []*VirtualNodeStore{vs} + } +} + +func (vs *VirtualNodeStore) IsValidVirtualNodeStore() bool { + if vs.parentVirtualNodeStore != nil { + if vs.parentVirtualNodeStore.parentVirtualNodeStore != nil { // Only two levels are allowed + return false + } + return vs.parentVirtualNodeStore.IsValidVirtualNodeStore() + } + + storeCount := len(vs.splittVirtualNodeStores) + if storeCount == 0 { + return true + } else if storeCount == 1 { + return false + } + + // check lower/upperbound of top vNode + if vs.lowerbound != vs.splittVirtualNodeStores[0].adjustedLowerBound || vs.upperbound != vs.splittVirtualNodeStores[storeCount-1].adjustedUpperBound { + return false + } + // check adjusted lower/upperbound + if vs.splittVirtualNodeStores[0].adjustedLowerBound >= vs.splittVirtualNodeStores[0].adjustedUpperBound { + return false + } + isParentVNodeFound := false + previousUpperBound := vs.splittVirtualNodeStores[0].adjustedUpperBound + for i := 0; i < storeCount; i++ { + currentVNode := vs.splittVirtualNodeStores[i] + if currentVNode.parentVirtualNodeStore == nil { + if !isParentVNodeFound { + isParentVNodeFound = true + } else { + return false + } + } + if i > 0 { + if previousUpperBound != currentVNode.adjustedLowerBound || currentVNode.adjustedLowerBound >= currentVNode.adjustedUpperBound { + return false + } + previousUpperBound = currentVNode.adjustedUpperBound + } + } + + return isParentVNodeFound +} + func (vs *VirtualNodeStore) GetLocation() location.Location { return vs.location } @@ -62,6 +165,13 @@ func (vs *VirtualNodeStore) GetAssignedClient() string { return vs.clientId } +func (vs *VirtualNodeStore) IsAssignedToClient() bool { + if vs.clientId != "" { + return true + } + return false +} + func (vs *VirtualNodeStore) AssignToClient(clientId string, eventQueue *cache.NodeEventQueue) bool { if vs.clientId != "" { return false @@ -80,10 +190,17 @@ func (vs *VirtualNodeStore) Release() { vs.clientId = "" } -func (vs *VirtualNodeStore) GetRange() (float64, float64) { +func (vs *VirtualNodeStore) GetOriginalRange() (float64, float64) { return vs.lowerbound, vs.upperbound } +func (vs *VirtualNodeStore) GetAdjustedRange() (float64, float64) { + if vs.parentVirtualNodeStore == nil && len(vs.splittVirtualNodeStores) == 0 { // top level vNode never being splitted + return vs.GetOriginalRange() + } + return vs.adjustedLowerBound, vs.adjustedUpperBound +} + // Snapshot generates a list of node for the List() call from a client, and a current RV map to client func (vs *VirtualNodeStore) SnapShot() ([]*types.LogicalNode, types.TransitResourceVersionMap) { vs.mu.RLock() @@ -120,6 +237,224 @@ func (vs *VirtualNodeStore) GenerateBookmarkEvent() *node.ManagedNodeEvent { return nil } +// Input requested host count +// TODO: error cases +func (vs *VirtualNodeStore) RequestCapacity(requestedHostCount int) []*VirtualNodeStore { + vs.mu.RLock() + defer vs.mu.RUnlock() + + freeHostNum := vs.GetFreeHostNum() + if freeHostNum <= requestedHostCount { + klog.Errorf("Free host count (%d) is less than or equal to requested host count (%d)", freeHostNum, requestedHostCount) + return nil + } + if vs.parentVirtualNodeStore != nil { + klog.Error("Request free host can only be done from parent node") + return nil + } + + if len(vs.splittVirtualNodeStores) == 0 { // first splitted vs + // create new virtual node store + newVSStore := &VirtualNodeStore{ + mu: sync.RWMutex{}, + nodeEventByHash: make(map[float64]*node.ManagedNodeEvent, freeHostNum-requestedHostCount), + location: vs.location, + parentVirtualNodeStore: vs, + adjustedLowerBound: vs.lowerbound, + adjustedUpperBound: vs.upperbound, + } + vs.adjustedLowerBound = vs.lowerbound + vs.adjustedUpperBound = vs.upperbound + vs.moveNodes(vs, newVSStore, freeHostNum-requestedHostCount, true) + + // Add new virtual node into store + vs.splittVirtualNodeStores = make([]*VirtualNodeStore, 2) + vs.splittVirtualNodeStores[0] = vs + vs.splittVirtualNodeStores[1] = newVSStore + return []*VirtualNodeStore{vs} + } else { + freeStores := vs.GetAllFreeChildStores() + candidateStores := make([]*VirtualNodeStore, 0) + count := 0 + for i := 0; i < len(freeStores); i++ { + newHostCount := count + freeStores[i].GetHostNum() + if newHostCount == requestedHostCount { + return append(candidateStores, freeStores[i]) + } else if newHostCount < requestedHostCount { + candidateStores = append(candidateStores, freeStores[i]) + } else { + newVSStore := &VirtualNodeStore{ + mu: sync.RWMutex{}, + nodeEventByHash: make(map[float64]*node.ManagedNodeEvent, newHostCount-requestedHostCount), + location: vs.location, + parentVirtualNodeStore: vs, + adjustedLowerBound: freeStores[i].adjustedLowerBound, + adjustedUpperBound: freeStores[i].adjustedUpperBound, + } + vs.moveNodes(freeStores[i], newVSStore, newHostCount-requestedHostCount, true) + + vs.splittVirtualNodeStores = append(vs.splittVirtualNodeStores, newVSStore) + sort.Sort(vs) + return append(candidateStores, freeStores[i]) + } + } + + // this should never be reached + klog.Error("Unexpected statement reach. Free host #: %v, requested host #: %v", freeHostNum, requestedHostCount) + return candidateStores + } +} + +func (vs *VirtualNodeStore) findIndexOfVS(vsToLocate *VirtualNodeStore) int { + for i := 0; i < len(vs.splittVirtualNodeStores); i++ { + if vs.splittVirtualNodeStores[i] == vsToLocate { + return i + } + } + return -1 +} + +// Return: +// 1. virtualNodeStore that is free and can move nodes to, if there is no, create a new one +// 2. False if adjancent VS is to the left of store[index], true if adjancent VS is to the right of the store[index] +func (vs *VirtualNodeStore) findAdjacentVacantStore(index int) (*VirtualNodeStore, bool) { + if index == 0 { // leftmost + if vs.splittVirtualNodeStores[1].clientId == "" { // assigned + return vs.splittVirtualNodeStores[1], true + } + } else if index == len(vs.splittVirtualNodeStores)-1 { // rightmost + if vs.splittVirtualNodeStores[index-1].clientId == "" { + return vs.splittVirtualNodeStores[index-1], false + } + } else { + if vs.splittVirtualNodeStores[index-1].clientId == "" { + return vs.splittVirtualNodeStores[index-1], false + } else if vs.splittVirtualNodeStores[index+1].clientId == "" { + return vs.splittVirtualNodeStores[index+1], true + } + } + + // create new store + newVSStore := &VirtualNodeStore{ + mu: sync.RWMutex{}, + nodeEventByHash: make(map[float64]*node.ManagedNodeEvent), + location: vs.location, + parentVirtualNodeStore: vs, + lowerbound: -1, + upperbound: -1, + } + vs.splittVirtualNodeStores = append(vs.splittVirtualNodeStores, newVSStore) + return newVSStore, true +} + +// Move #hostCount from sourceVNS to dstVNS +// If countFromHighEnd == true, choose hash value from biggest (move to right store) +// If countFromHighEnd == false, choose hash value from smallest (move to left store) +// Internal function, assume input is valid, no need to validate +func (vs *VirtualNodeStore) moveNodes(sourceVNS, dstVNS *VirtualNodeStore, hostCountToMove int, countFromHighEnd bool) error { + // optimization + if len(dstVNS.nodeEventByHash) == 0 && len(sourceVNS.nodeEventByHash) < hostCountToMove*2 { + switchVirtualNodeStore(sourceVNS, dstVNS) + sourceVNS, dstVNS = dstVNS, sourceVNS + hostCountToMove = len(sourceVNS.nodeEventByHash) - hostCountToMove + countFromHighEnd = !countFromHighEnd + } + + // sort nodes in sourceVNS by hash value + nodeHashes := make([]float64, len(sourceVNS.nodeEventByHash)) + index := 0 + for k, _ := range sourceVNS.nodeEventByHash { + nodeHashes[index] = k + index++ + } + sort.Float64s(nodeHashes) + + // starting position and bound adjustment + start := 0 + if countFromHighEnd { + start = index - 1 + sourceVNS.adjustedUpperBound = nodeHashes[len(sourceVNS.nodeEventByHash)-hostCountToMove] + dstVNS.adjustedLowerBound = sourceVNS.adjustedUpperBound + } else { + sourceVNS.adjustedLowerBound = nodeHashes[hostCountToMove] + dstVNS.adjustedUpperBound = sourceVNS.adjustedLowerBound + } + count := 0 + + // move nodes + for { + nodeHashKey := nodeHashes[start] + dstVNS.nodeEventByHash[nodeHashKey] = sourceVNS.nodeEventByHash[nodeHashKey] + delete(sourceVNS.nodeEventByHash, nodeHashKey) + count++ + + if count < hostCountToMove { + if countFromHighEnd { + start-- + } else { + start++ + } + } else { + break + } + } + + return nil +} + +// This function switch all elements of store1 and store2 +// No need to switch location - it should always be the same (otherwise, nodes cannot be switched) +func switchVirtualNodeStore(store1, store2 *VirtualNodeStore) { + // lock cannot be switched + //store1.mu, store2.mu = store2.mu, store1.mu + store1.nodeEventByHash, store2.nodeEventByHash = store2.nodeEventByHash, store1.nodeEventByHash + + // lower and upper bounds are immutable + //store1.lowerbound, store2.lowerbound = store2.lowerbound, store1.lowerbound + //store1.upperbound, store2.upperbound = store2.upperbound, store1.upperbound + store1.adjustedLowerBound, store2.adjustedLowerBound = store2.adjustedLowerBound, store1.adjustedLowerBound + store1.adjustedUpperBound, store2.adjustedUpperBound = store2.adjustedUpperBound, store1.adjustedUpperBound + + store1.splittVirtualNodeStores, store2.splittVirtualNodeStores = store2.splittVirtualNodeStores, store1.splittVirtualNodeStores + // parent stores CANNOT be switched + //store1.parentVirtualNodeStore, store2.parentVirtualNodeStore = store2.parentVirtualNodeStore, store1.parentVirtualNodeStore + + if store1.clientId != store2.clientId { + store1.clientId, store2.clientId = store2.clientId, store1.clientId + } + if store1.eventQueue != store2.eventQueue { + store1.eventQueue, store2.eventQueue = store2.eventQueue, store1.eventQueue + } +} + +// Implement sort.Interface +func (vs *VirtualNodeStore) Len() int { + return len(vs.splittVirtualNodeStores) +} + +func (vs *VirtualNodeStore) Less(i, j int) bool { + return vs.splittVirtualNodeStores[i].adjustedLowerBound < vs.splittVirtualNodeStores[j].adjustedLowerBound +} + +func (vs *VirtualNodeStore) Swap(i, j int) { + vs.splittVirtualNodeStores[i], vs.splittVirtualNodeStores[j] = vs.splittVirtualNodeStores[j], vs.splittVirtualNodeStores[i] +} + +func (vs *VirtualNodeStore) GetBoundaris() string { + vs.mu.RLock() + defer vs.mu.RUnlock() + + result := new(bytes.Buffer) + result.WriteString(fmt.Sprintf("original boundary (%v, %v), adjusted boundary (%v, %v)\n", + vs.lowerbound, vs.upperbound, vs.adjustedLowerBound, vs.adjustedUpperBound)) + for i := 0; i < len(vs.splittVirtualNodeStores); i++ { + result.WriteString(fmt.Sprintf("Child store %d, boundary (%v, %v)\n", + i, vs.splittVirtualNodeStores[i].adjustedLowerBound, vs.splittVirtualNodeStores[i].adjustedUpperBound)) + } + + return string(result.Bytes()) +} + type NodeStore struct { // granularity of the ring - degree for each virtual node managed arc granularOfRing float64 @@ -200,7 +535,7 @@ func (ns *NodeStore) CheckFreeCapacity(requestedHostNum int) bool { defer ns.nsLock.Unlock() allocatableHostNum := 0 for _, vs := range *ns.vNodeStores { - allocatableHostNum += vs.GetHostNum() + allocatableHostNum += vs.GetFreeHostNum() if allocatableHostNum >= requestedHostNum { return true } @@ -361,7 +696,23 @@ func (ns *NodeStore) getNodeHash(node *node.ManagedNodeEvent) (float64, int) { func (ns *NodeStore) getVirtualNodeStore(node *node.ManagedNodeEvent) (float64, int, *VirtualNodeStore) { hashValue, ringId := ns.getNodeHash(node) virtualNodeIndex := int(math.Floor(hashValue / ns.granularOfRing)) - return hashValue, ringId, (*ns.vNodeStores)[virtualNodeIndex] + parentVNS := (*ns.vNodeStores)[virtualNodeIndex] + if len(parentVNS.splittVirtualNodeStores) > 0 && (hashValue < parentVNS.adjustedLowerBound || hashValue >= parentVNS.adjustedUpperBound) { + // prevent node splitting happening during search + parentVNS.mu.RLock() + defer parentVNS.mu.RUnlock() + // node shall locate in child VirtualNodeStore + // linear search first, might need to change to binary search based on performance + for i := 0; i < len(parentVNS.splittVirtualNodeStores); i++ { + if hashValue >= parentVNS.splittVirtualNodeStores[i].adjustedLowerBound && hashValue < parentVNS.splittVirtualNodeStores[i].adjustedUpperBound { + return hashValue, ringId, parentVNS.splittVirtualNodeStores[i] + } + } + + klog.Errorf("Could not find node position in virtual node stores. HashValue %v, VS %s", hashValue, parentVNS.GetBoundaris()) + } + + return hashValue, ringId, parentVNS } func (ns *NodeStore) addNodeToRing(nodeEvent *node.ManagedNodeEvent) (isNewNode bool) { diff --git a/resource-management/pkg/distributor/storage/nodestore_test.go b/resource-management/pkg/distributor/storage/nodestore_test.go new file mode 100644 index 0000000..8f25ed6 --- /dev/null +++ b/resource-management/pkg/distributor/storage/nodestore_test.go @@ -0,0 +1,110 @@ +/* +Copyright 2022 Authors of Global Resource Service. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package storage + +import ( + "github.com/google/uuid" + "github.com/stretchr/testify/assert" + "global-resource-service/resource-management/pkg/common-lib/hash" + "global-resource-service/resource-management/pkg/common-lib/types" + "global-resource-service/resource-management/pkg/common-lib/types/location" + "global-resource-service/resource-management/pkg/common-lib/types/runtime" + "global-resource-service/resource-management/pkg/distributor/node" + "math" + "strconv" + "sync" + "testing" +) + +func TestAdjustCapacity_1stSplit(t *testing.T) { + for i := 25; i < 80; i++ { + for j := 1; j < i; j++ { + vs := generateInitVirtalNodes(i) + lowerBoundOld := vs.lowerbound + upperBoundOld := vs.upperbound + vsPointerOld := vs + + vs.RequestCapacity(j) + + // check result + assert.Nil(t, vs.parentVirtualNodeStore) + assert.Equal(t, j, len(vs.nodeEventByHash)) + assert.True(t, vs.IsValidVirtualNodeStore()) + + assert.Equal(t, 2, len(vs.splittVirtualNodeStores)) + assert.NotEqual(t, vs.splittVirtualNodeStores[0], vs.splittVirtualNodeStores[1]) + assert.Equal(t, i, len(vs.splittVirtualNodeStores[0].nodeEventByHash)+len(vs.splittVirtualNodeStores[1].nodeEventByHash)) + + // init vs has immutable lower & upper bound + assert.Equal(t, lowerBoundOld, vs.lowerbound) + assert.Equal(t, upperBoundOld, vs.upperbound) + vs.mu.Lock() + vs.mu.Unlock() + + assert.Equal(t, vsPointerOld, vs) + assert.Equal(t, len(vs.nodeEventByHash), vs.GetHostNum()) + } + } +} + +func TestAdjustCapacity_Merge(t *testing.T) { + // one +} + +func generateInitVirtalNodes(hostNum int) *VirtualNodeStore { + loc := location.NewLocation(location.Beijing, location.ResourcePartition2) + vs := &VirtualNodeStore{ + mu: sync.RWMutex{}, + nodeEventByHash: make(map[float64]*node.ManagedNodeEvent, hostNum), + location: *loc, + } + + lowerBound := float64(math.MaxUint64) + upperBound := float64(-1) + for i := 0; i < hostNum; i++ { + n := createRandomNode(i, loc) + ne := runtime.NewNodeEvent(n, runtime.Added) + mgmtNE := node.NewManagedNodeEvent(ne, loc) + hashValue := float64(hash.HashStrToUInt64(mgmtNE.GetId())) / math.MaxUint64 * 360 + vs.nodeEventByHash[hashValue] = mgmtNE + + if lowerBound > hashValue { + lowerBound = hashValue + } + if upperBound < hashValue { + upperBound = hashValue + } + } + + vs.lowerbound = lowerBound + // upperBound is exclusive, increase a bit + vs.upperbound = upperBound + 1 + + return vs +} + +func createRandomNode(rv int, loc *location.Location) *types.LogicalNode { + id := uuid.New() + return &types.LogicalNode{ + Id: id.String(), + ResourceVersion: strconv.Itoa(rv), + GeoInfo: types.NodeGeoInfo{ + Region: types.RegionName(loc.GetRegion()), + ResourcePartition: types.ResourcePartitionName(loc.GetResourcePartition()), + }, + } +}