Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 16 additions & 5 deletions resource-management/pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func (dis *ResourceDistributor) allocateNodesToClient(clientId string, requested
allStores := dis.defaultNodeStore.GetVirtualStores()
freeStores := make(map[*storage.VirtualNodeStore]bool)
for _, vs := range *allStores {
if vs.GetAssignedClient() == "" && vs.GetHostNum() > 0 {
if vs.GetFreeHostNum() > 0 {
freeStores[vs] = true
}
}
Expand All @@ -131,9 +131,20 @@ func (dis *ResourceDistributor) allocateNodesToClient(clientId string, requested
assignedHostCount := 0
hostAssignIsOK := false
for i := 0; i < len(storesToSelectInorder); i++ {
selectedStores = append(selectedStores, storesToSelectInorder[i])
assignedHostCount += (*storesToSelectInorder[i]).GetHostNum()
if assignedHostCount >= requestedHostNum {
candidateStore := storesToSelectInorder[i]

newHostCount := assignedHostCount + candidateStore.GetFreeHostNum()
if newHostCount <= requestedHostNum {
selectedStores = append(selectedStores, candidateStore.GetAllFreeChildStores()...)
assignedHostCount = newHostCount
if newHostCount == requestedHostNum {
hostAssignIsOK = true
break
}
} else { // split vNode
requestedStores := candidateStore.RequestCapacity(requestedHostNum - assignedHostCount)
selectedStores = append(selectedStores, requestedStores...)
assignedHostCount += candidateStore.GetHostNum()
hostAssignIsOK = true
break
}
Expand Down Expand Up @@ -291,7 +302,7 @@ func (dis *ResourceDistributor) persistVirtualNodesAssignment(clientId string, a
vNodeToSave := &store.VirtualNodeConfig{
Location: s.GetLocation(),
}
vNodeToSave.Lowerbound, vNodeToSave.Upperbound = s.GetRange()
vNodeToSave.Lowerbound, vNodeToSave.Upperbound = s.GetOriginalRange()
vNodeConfigs[i] = vNodeToSave
}
assignment := &store.VirtualNodeAssignment{
Expand Down
28 changes: 14 additions & 14 deletions resource-management/pkg/distributor/distributor_concurrency_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,24 +45,24 @@ func TestSingleRPMutipleClients_Workflow(t *testing.T) {
updateEventNum: 1000,
},
{
name: "Test 10K nodes with 5 clients each has 500 , each got 10K update events",
name: "Test 10K nodes with 5 clients each has 500 hosts, each got 10K update events",
nodeNum: 10000,
clientNum: 5,
hostPerClient: 500,
updateEventNum: 10000,
},
{
name: "Test 10K nodes with 5 clients each has 500 , each got 100K update events",
name: "Test 10K nodes with 20 clients each has 500 hosts, each got 100K update events",
nodeNum: 10000,
clientNum: 5,
clientNum: 20,
hostPerClient: 500,
updateEventNum: 100000,
},
{
name: "Test 1M nodes with 50 clients each has 15000 , each got 100K update events",
name: "Test 1M nodes with 50 clients each has 20000 hosts, each got 100K update events",
nodeNum: 1000000,
clientNum: 50,
hostPerClient: 15000,
hostPerClient: 20000,
updateEventNum: 100000,
},
}
Expand Down Expand Up @@ -112,7 +112,7 @@ func TestSingleRPMutipleClients_Workflow(t *testing.T) {

assert.Nil(t, err)
assert.NotNil(t, latestRVs)
assert.True(t, len(nodes) >= tt.hostPerClient)
assert.Equal(t, tt.hostPerClient, len(nodes))
// t.Logf("Client %d %s latest rvs: %v.Total hosts: %d\n", i, clientId, latestRVs, len(nodes))
latestRVsByClient[i] = latestRVs
nodesByClient[i] = nodes
Expand Down Expand Up @@ -217,13 +217,13 @@ func TestMultipleRPsMutipleClients_Workflow(t *testing.T) {
updateEventNum: 10000,
},
{
name: "Test 6 region, each has 20 RPs, 40K hosts per RP, 4.8M nodes with 200 clients, each got 20K hosts, 20K update events",
regionNum: 6,
rpNum: 20,
hostPerRP: 40000,
clientNum: 200,
hostPerClient: 20000,
updateEventNum: 20000,
name: "Test 5 region, each has 40 RPs, 25K hosts per RP, 5M nodes with 100 clients, each got 50K hosts, 50K update events",
regionNum: 5,
rpNum: 40,
hostPerRP: 25000,
clientNum: 100,
hostPerClient: 50000,
updateEventNum: 50000,
},
}

Expand Down Expand Up @@ -297,7 +297,7 @@ func TestMultipleRPsMutipleClients_Workflow(t *testing.T) {

assert.Nil(t, err)
assert.NotNil(t, latestRVs)
assert.True(t, len(nodes) >= tt.hostPerClient)
assert.Equal(t, tt.hostPerClient, len(nodes))
// t.Logf("Client %d %s latest rvs: %v.Total hosts: %d\n", i, clientId, latestRVs, len(nodes))
latestRVsByClient[i] = latestRVs
nodesByClient[i] = nodes
Expand Down
107 changes: 66 additions & 41 deletions resource-management/pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func TestDistributorInit(t *testing.T) {
store := (*defaultNodeStores)[i]
assert.Equal(t, 0, store.GetHostNum(), "Initial host number should be 0")
assert.Equal(t, "", store.GetAssignedClient(), "Virtual store should not be assigned to any client")
lowerBound, upperBound := store.GetRange()
lowerBound, upperBound := store.GetOriginalRange()
assert.Equal(t, lower, lowerBound, "Expecting lower bound %f but got %f. store id %d, hash range (%f, %f]", lower, lowerBound, i, lowerBound, upperBound)
assert.NotEqual(t, lowerBound, upperBound, "Expecting lower bound not equal to upper bound for virtual store %d. Got hash range (%f, %f]", i, lowerBound, upperBound)
lower = upperBound
Expand Down Expand Up @@ -337,45 +337,70 @@ func TestRegisterClient_ErrorCases(t *testing.T) {
}

func TestRegisterClient_WithinLimit(t *testing.T) {
distributor := setUp()
defer tearDown()

result, rvMap := distributor.ProcessEvents(generateAddNodeEvent(10000, defaultLocBeijing_RP1))
assert.True(t, result)
assert.NotNil(t, rvMap)
assert.Equal(t, 10000, distributor.defaultNodeStore.GetTotalHostNum())
vNodeNumberPerRPToTest := []int{2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 50, 51, 100, 101, 200, 201, 500, 501}
for _, v := range vNodeNumberPerRPToTest {
t.Logf("Test register client with virutalStoreNumPerResourcePartition = %d", v)
virutalStoreNumPerResourcePartition = v
distributor := setUp()

totalHostCount := 10000
result, rvMap := distributor.ProcessEvents(generateAddNodeEvent(totalHostCount, defaultLocBeijing_RP1))
assert.True(t, result)
assert.NotNil(t, rvMap)
assert.Equal(t, totalHostCount, distributor.defaultNodeStore.GetTotalHostNum())

allHosts := make(map[string]bool, 0)
requestedHostNum := 500
for i := 0; i < 20; i++ {
start := time.Now()
client := types.Client{ClientId: uuid.New().String(), Resource: types.ResourceRequest{TotalMachines: requestedHostNum}, ClientInfo: types.ClientInfoType{}}
err := distributor.RegisterClient(&client)
duration := time.Since(start)

clientId := client.ClientId
assert.NotNil(t, clientId, "Expecting not nil client id")
assert.False(t, clientId == "", "Expecting non empty client id")
assert.Nil(t, err, "Expecting nil error")

// check virtual node assignment
virtualStoresAssignedToClient, isOK := distributor.clientToStores[clientId]
assert.True(t, isOK, "Expecting get virtual stores assigned to client %s", clientId)
assert.True(t, len(virtualStoresAssignedToClient) > 0, "Expecting get non empty virtual stores assigned to client %s", clientId)
hostCount := 0
for j := 0; j < len(virtualStoresAssignedToClient); j++ {
vs := virtualStoresAssignedToClient[j]
assert.Equal(t, clientId, vs.GetAssignedClient(), "Unexpected virtual store client id %s", clientId)
lower, upper := vs.GetAdjustedRange()
t.Logf("Virtual node store (%f, %f] is assigned to client %s, host number %d. vNodePerPR %v", lower, upper, clientId, vs.GetHostNum(), virutalStoreNumPerResourcePartition)
if !vs.IsValidVirtualNodeStore() {
t.Fatalf("Invalid vNode store for client %v", clientId)
}
hostCount += vs.GetHostNum()
}
t.Logf("Total %d hosts are assigned to client %s\nTook %v to register the client.\n", hostCount, clientId, duration)
if requestedHostNum != hostCount {
assert.Equal(t, requestedHostNum, hostCount, "Assigned host number %d is not equal to requested %d", hostCount, requestedHostNum)
t.Fatalf("virutalStoreNumPerResourcePartition = %d, client count %v", virutalStoreNumPerResourcePartition, i)
}

requestedHostNum := 500
for i := 0; i < 10; i++ {
start := time.Now()
client := types.Client{ClientId: uuid.New().String(), Resource: types.ResourceRequest{TotalMachines: requestedHostNum}, ClientInfo: types.ClientInfoType{}}
err := distributor.RegisterClient(&client)
duration := time.Since(start)

clientId := client.ClientId
assert.NotNil(t, clientId, "Expecting not nil client id")
assert.False(t, clientId == "", "Expecting non empty client id")
assert.Nil(t, err, "Expecting nil error")

// check virtual node assignment
virtualStoresAssignedToClient, isOK := distributor.clientToStores[clientId]
assert.True(t, isOK, "Expecting get virtual stores assigned to client %s", clientId)
assert.True(t, len(virtualStoresAssignedToClient) > 0, "Expecting get non empty virtual stores assigned to client %s", clientId)
hostCount := 0
for i := 0; i < len(virtualStoresAssignedToClient); i++ {
vs := virtualStoresAssignedToClient[i]
assert.Equal(t, clientId, vs.GetAssignedClient(), "Unexpected virtual store client id %s", clientId)
lower, upper := vs.GetRange()
t.Logf("Virtual node store (%f, %f] is assigned to client %s, host number %d\n", lower, upper, clientId, vs.GetHostNum())
hostCount += vs.GetHostNum()
// check nodes number with list nodes
nodes, _, err := distributor.ListNodesForClient(clientId)
assert.Nil(t, err, "List nodes by client id should be successful")
assert.Equal(t, hostCount, len(nodes), "Node count from virtual store should be same as list nodes")

// check node allocation and make sure there is no duplicates
for _, n := range nodes {
if _, isOK := allHosts[n.Id]; isOK {
t.Fatalf("Host was assigned to another client but reassigned again, id %v", n.Id)
} else {
allHosts[n.Id] = true
}
}
}
t.Logf("Total %d hosts are assigned to client %s\nTook %v to register the client.\n", hostCount, clientId, duration)
assert.True(t, hostCount >= requestedHostNum, "Assigned host number %d is less than requested %d", hostCount, requestedHostNum)

// check nodes number with list nodes
nodes, _, err := distributor.ListNodesForClient(clientId)
assert.Nil(t, err, "List nodes by client id should be successful")
assert.Equal(t, hostCount, len(nodes), "Node count from virtual store should be same as list nodes")
assert.Equal(t, totalHostCount, len(allHosts))

tearDown()
}
}

Expand Down Expand Up @@ -411,7 +436,7 @@ func TestRegistrationWorkflow(t *testing.T) {
nodes, latestRVs, err := distributor.ListNodesForClient(clientId)
assert.Nil(t, err)
assert.NotNil(t, latestRVs)
assert.True(t, len(nodes) >= 500)
assert.Equal(t, requestedHostNum, len(nodes))
t.Logf("Latest rvs: %v. Total hosts: %d\n", latestRVs, len(nodes))
// check each node event
nodeIds := make(map[string]bool)
Expand Down Expand Up @@ -536,7 +561,7 @@ func TestRegistration5MCase(t *testing.T) {

assert.Nil(t, err2)
assert.NotNil(t, latestRVs)
assert.True(t, len(nodes) >= tt.hostPerScheduler)
assert.Equal(t, tt.hostPerScheduler, len(nodes))
klog.Infof("List nodes for client %s took %v", clientId, duration)
}

Expand All @@ -545,7 +570,7 @@ func TestRegistration5MCase(t *testing.T) {
}

wg.Wait()
assert.Equal(t, 1, totalErrors)
assert.Equal(t, 0, totalErrors)
t.Logf("%s succeed", tt.name)
})
}
Expand Down Expand Up @@ -576,7 +601,7 @@ func TestWatchRenewal(t *testing.T) {
nodes, latestRVs, err := distributor.ListNodesForClient(clientId)
assert.Nil(t, err)
assert.NotNil(t, latestRVs)
assert.True(t, len(nodes) >= 500)
assert.Equal(t, requestedHostNum, len(nodes))
t.Logf("Latest rvs: %v. Total hosts: %d\n", latestRVs, len(nodes))
// check each node event
nodeIds := make(map[string]bool)
Expand Down
Loading