-
Notifications
You must be signed in to change notification settings - Fork 29
Expand file tree
/
Copy pathaggregate.go
More file actions
430 lines (370 loc) · 14.4 KB
/
Copy pathaggregate.go
File metadata and controls
430 lines (370 loc) · 14.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
package cloudmanager
import (
"encoding/json"
"fmt"
"log"
"strconv"
"strings"
"time"
"github.com/fatih/structs"
)
// createAggregateRequest the users input for creating an Aggregate
type createAggregateRequest struct {
Name string `structs:"name"`
WorkingEnvironmentID string `structs:"workingEnvironmentId"`
NumberOfDisks int `structs:"numberOfDisks,omitempty"`
DiskSize diskSize `structs:"diskSize,omitempty"`
HomeNode string `structs:"homeNode,omitempty"`
ProviderVolumeType string `structs:"providerVolumeType,omitempty"`
CapacityTier string `structs:"capacityTier,omitempty"`
Iops int `structs:"iops,omitempty"`
Throughput int `structs:"throughput,omitempty"`
InitialEvAggregateSize diskSize `structs:"initialEvAggregateSize,omitempty"`
}
// diskSize struct
type diskSize struct {
Size int `structs:"size"`
Unit string `structs:"unit"`
}
// aggregateResult from aggregate request
type aggregateResult struct {
Name string `json:"name"`
AvailableCapacity capacity `json:"availableCapacity"`
TotalCapacity capacity `json:"totalCapacity"`
UsedCapacity capacity `json:"usedCapacity"`
Volumes []volume `json:"volumes"`
ProviderVolumes []providerVolume `json:"providerVolumes"`
Disks []disk `json:"disks"`
State string `json:"state"`
EncryptionType string `json:"encryptionType"`
EncryptionKeyID string `json:"encryptionKeyId"`
IsRoot bool `json:"isRoot"`
HomeNode string `json:"homeNode"`
OwnerNode string `json:"ownerNode"`
CapacityTier string `json:"capacityTier"`
CapacityTierUsed capacity `json:"capacityTierUsed"`
SidlEnabled bool `json:"sidlEnabled"`
SnaplockType string `json:"snaplockType"`
}
type capacity struct {
Size float64 `json:"size"`
Unit string `json:"unit"`
}
type volume struct {
Name string `json:"name"`
TotalSize capacity `json:"totalSize"`
UsedSize capacity `json:"usedSize"`
ThinProvisioned bool `json:"thinProvisioned"`
IsClone bool `json:"isClone"`
RootVolume bool `json:"rootVolume"`
}
type providerVolume struct {
ID string `json:"id"`
Name string `json:"name"`
Size capacity `json:"size"`
State string `json:"state"`
Device string `json:"device"`
InstanceID string `json:"instanceId"`
DiskType string `json:"diskType"`
Encrypted bool `json:"encrypted"`
Iops int `json:"iops"`
Throughput int `json:"throughput"`
}
type disk struct {
Name string `json:"name"`
Position string `json:"position"`
OwnerNode string `json:"ownerNode"`
Device string `json:"device"`
VMDiskProperties vmDiskProperties `json:"vmDiskProperties"`
}
type vmDiskProperties struct {
ObjectName string `json:"objectName"`
StorageAccountName string `json:"storageAccountName"`
ContainerName string `json:"containerName"`
}
type aggregateRequest struct {
WorkingEnvironmentID string `structs:"workingEnvironmentId"`
Name string `structs:"name"`
ID string `structs:"uuid"`
}
type deleteAggregateRequest struct {
WorkingEnvironmentID string `structs:"workingEnvironmentId"`
Name string `structs:"name"`
}
type updateAggregateRequest struct {
WorkingEnvironmentID string `structs:"workingEnvironmentId"`
Name string `structs:"name"`
NumberOfDisks int `structs:"numberOfDisks"`
}
type increaseAggregateCapacityRequest struct {
WorkingEnvironmentID string `structs:"workingEnvironmentId"`
AggregateName string `structs:"aggregateName"`
CapacityToAdd diskSize `structs:"capacityToAdd"`
}
// get aggregate by workingEnvironmentId+aggregate name
func (c *Client) getAggregate(request aggregateRequest, name string, sourceWorkingEnvironmentType string, clientID string, isSaaS bool, connectorIP string) (aggregateResult, error) {
log.Printf("getAggregate %s", name)
hostType := "CloudManagerHost"
if !isSaaS {
hostType = "http://" + connectorIP
}
var baseURL string
if sourceWorkingEnvironmentType == "ON_PREM" {
baseURL = fmt.Sprintf("/occm/api/onprem/aggregates?workingEnvironmentId=%s", request.WorkingEnvironmentID)
} else {
rootURL, cloudProviderName, err := c.getAPIRoot(request.WorkingEnvironmentID, clientID, isSaaS, connectorIP)
if err != nil {
log.Print("getAggregate: Cannot get API root.")
return aggregateResult{}, err
}
if cloudProviderName != "Amazon" {
baseURL = fmt.Sprintf("%s/aggregates/%s", rootURL, request.WorkingEnvironmentID)
} else {
baseURL = fmt.Sprintf("%s/aggregates?workingEnvironmentId=%s", rootURL, request.WorkingEnvironmentID)
}
}
var aggregates []aggregateResult
statusCode, response, _, err := c.CallAPIMethod("GET", baseURL, nil, c.Token, hostType, clientID)
if err != nil {
log.Printf("getAggregate request failed. Response %v, err %v", response, err)
return aggregateResult{}, err
}
responseError := apiResponseChecker(statusCode, response, "getAggregate")
if responseError != nil {
return aggregateResult{}, responseError
}
if err := json.Unmarshal(response, &aggregates); err != nil {
log.Print("Failed to unmarshall response from getAggregates")
return aggregateResult{}, err
}
log.Printf("getAggregate: get list of aggregates. %v", aggregates)
log.Printf("Find the match one. %v", name)
for i := range aggregates {
if aggregates[i].Name == name {
log.Printf("Found aggregate: %#v state %s", aggregates[i], aggregates[i].State)
return aggregates[i], nil
}
}
log.Print("Cannot find the aggregate")
return aggregateResult{}, nil
}
// create aggregate
func (c *Client) createAggregate(request *createAggregateRequest, clientID string, isSaaS bool, connectorIP string) (aggregateResult, error) {
log.Printf("createAggregate %v... ", (*request).Name)
params := structs.Map(request)
hostType := "CloudManagerHost"
if !isSaaS {
hostType = "http://" + connectorIP
}
var baseURL string
rootURL, _, err := c.getAPIRoot(request.WorkingEnvironmentID, clientID, isSaaS, connectorIP)
if err != nil {
log.Print("createAggregate: Cannot get API root.")
return aggregateResult{}, err
}
baseURL = fmt.Sprintf("%s/aggregates", rootURL)
retries := 0
maxRetries := 24 // max retry 150 sec * 24 = 1hr
for {
log.Print("Call aggregate creation API... ", (*request).Name)
statusCode, response, onCloudRequestID, err := c.CallAPIMethod("POST", baseURL, params, c.Token, hostType, clientID)
if err != nil {
log.Print("createAggregate request failed", (*request).Name)
return aggregateResult{}, err
}
responseError := apiResponseChecker(statusCode, response, "createAggregate")
if responseError != nil {
if strings.Contains(responseError.Error(), "code: 409, message: {\"message\":\"Couldn't perform action Create Aggregate, because there are ongoing operations which might interfere with it") {
if retries >= maxRetries {
log.Print("Failed: Reached aggregate creation max retries.")
break
}
retries++
log.Print("Wait for 150 seconds... ", retries)
time.Sleep(150 * time.Second)
} else {
return aggregateResult{}, responseError
}
} else {
// wait for creation
log.Print("Wait for aggregate creation... ", (*request).Name)
if isSaaS {
err = c.waitOnCompletion(onCloudRequestID, "Aggregate", "create", 15, 60, clientID)
} else {
err = c.waitOnCompletionForNotSaas(onCloudRequestID, "Aggregate", "create", 15, 60, clientID, connectorIP)
}
log.Print("Finish waiting... ", (*request).Name)
if err != nil {
return aggregateResult{}, err
}
break
}
}
workingEnvDetail, err := c.getWorkingEnvironmentInfo(request.WorkingEnvironmentID, clientID, isSaaS, connectorIP)
if err != nil {
log.Print("Cannot get working environment information.")
return aggregateResult{}, err
}
var aggregate aggregateResult
aggregate, err = c.getAggregate(aggregateRequest{WorkingEnvironmentID: request.WorkingEnvironmentID}, request.Name, workingEnvDetail.WorkingEnvironmentType, clientID, isSaaS, connectorIP)
if err != nil {
return aggregateResult{}, err
}
log.Printf("Aggregate %v status %v", aggregate.Name, aggregate.State)
return aggregate, nil
}
// delete aggregate
func (c *Client) deleteAggregate(request deleteAggregateRequest, clientID string, isSaaS bool, connectorIP string) error {
log.Print("On deleteAggregate... ")
hostType := "CloudManagerHost"
if !isSaaS {
hostType = "http://" + connectorIP
}
var baseURL string
rootURL, _, err := c.getAPIRoot(request.WorkingEnvironmentID, clientID, isSaaS, connectorIP)
if err != nil {
log.Print("deleteAggregate: Cannot get API root.")
return err
}
baseURL = fmt.Sprintf("%s/aggregates/%s/%s", rootURL, request.WorkingEnvironmentID, request.Name)
statusCode, response, onCloudRequestID, err := c.CallAPIMethod("DELETE", baseURL, nil, c.Token, hostType, clientID)
if err != nil {
log.Print("deleteAggregate request failed")
return err
}
responseError := apiResponseChecker(statusCode, response, "deleteAggregate")
if responseError != nil {
return responseError
}
log.Print("Wait for aggregate deletion.")
if isSaaS {
err = c.waitOnCompletion(onCloudRequestID, "Aggregate", "delete", 15, 60, clientID)
} else {
err = c.waitOnCompletionForNotSaas(onCloudRequestID, "Aggregate", "delete", 15, 60, clientID, connectorIP)
}
return err
}
func (c *Client) updateAggregate(request updateAggregateRequest, clientID string, isSaaS bool, connectorIP string) error {
log.Print("updateAggregate... ")
params := structs.Map(request)
hostType := "CloudManagerHost"
if !isSaaS {
hostType = "http://" + connectorIP
}
var baseURL string
rootURL, _, err := c.getAPIRoot(request.WorkingEnvironmentID, clientID, isSaaS, connectorIP)
if err != nil {
log.Print("updateAggregate: Cannot get API root.")
return err
}
baseURL = fmt.Sprintf("%s/aggregates/%s/%s/disks", rootURL, request.WorkingEnvironmentID, request.Name)
statusCode, response, onCloudRequestID, err := c.CallAPIMethod("POST", baseURL, params, c.Token, hostType, clientID)
if err != nil {
log.Print("updateAggregate request failed")
return err
}
responseError := apiResponseChecker(statusCode, response, "updateAggregate")
if responseError != nil {
return responseError
}
log.Print("Wait for aggregate update.")
if isSaaS {
err = c.waitOnCompletion(onCloudRequestID, "Aggregate", "update", 10, 60, clientID)
} else {
err = c.waitOnCompletionForNotSaas(onCloudRequestID, "Aggregate", "update", 10, 60, clientID, connectorIP)
}
return err
}
// increaseAggregateCapacity increases the capacity of an aggregate using Amazon EBS Elastic Volumes
func (c *Client) increaseAggregateCapacity(request increaseAggregateCapacityRequest, clientID string, isSaaS bool, connectorIP string) error {
log.Printf("increaseAggregateCapacity for aggregate %s by %d %s", request.AggregateName, request.CapacityToAdd.Size, request.CapacityToAdd.Unit)
params := structs.Map(request)
hostType := "CloudManagerHost"
if !isSaaS {
hostType = "http://" + connectorIP
}
var baseURL string
rootURL, cloudProviderName, err := c.getAPIRoot(request.WorkingEnvironmentID, clientID, isSaaS, connectorIP)
if err != nil {
log.Print("increaseAggregateCapacity: Cannot get API root.")
return err
}
// Only AWS supports aggregate capacity increase
if cloudProviderName != "Amazon" {
return fmt.Errorf("aggregate capacity increase is currently only supported for AWS")
}
// Build the API endpoint using the root URL from getAPIRoot
baseURL = fmt.Sprintf("%s/aggregates/%s/%s/add-capacity", rootURL, request.WorkingEnvironmentID, request.AggregateName)
statusCode, response, onCloudRequestID, err := c.CallAPIMethod("POST", baseURL, params, c.Token, hostType, clientID)
if err != nil {
log.Print("increaseAggregateCapacity request failed")
return err
}
responseError := apiResponseChecker(statusCode, response, "increaseAggregateCapacity")
if responseError != nil {
return responseError
}
log.Print("Wait for aggregate capacity increase.")
if isSaaS {
err = c.waitOnCompletion(onCloudRequestID, "Aggregate", "increase capacity", 15, 60, clientID)
} else {
err = c.waitOnCompletionForNotSaas(onCloudRequestID, "Aggregate", "increase capacity", 15, 60, clientID, connectorIP)
}
return err
}
// flattenCapacity: convert struct size + unit
func flattenCapacity(c capacity) interface{} {
flattened := make(map[string]interface{})
flattened["size"] = strconv.FormatFloat(c.Size, 'f', -1, 64)
flattened["unit"] = c.Unit
return flattened
}
func flattenDisks(d []disk) interface{} {
dts := make([]map[string]interface{}, 0, len(d))
for _, diskelement := range d {
dt := make(map[string]interface{})
dt["name"] = diskelement.Name
dt["position"] = diskelement.Position
dt["device"] = diskelement.Device
dt["owner_node"] = diskelement.OwnerNode
vdp := make(map[string]interface{})
vdp["object_name"] = diskelement.VMDiskProperties.ObjectName
vdp["storage_account_name"] = diskelement.VMDiskProperties.StorageAccountName
vdp["container_name"] = diskelement.VMDiskProperties.ContainerName
dt["vm_disk_properties"] = vdp
dts = append(dts, dt)
}
return dts
}
func flattenVolumes(v []volume) interface{} {
volumes := make([]map[string]interface{}, 0, len(v))
for _, volume := range v {
vol := make(map[string]interface{})
vol["name"] = volume.Name
vol["thin_provisioned"] = volume.ThinProvisioned
vol["root_volume"] = volume.RootVolume
vol["is_clone"] = volume.IsClone
vol["total_size"] = flattenCapacity(volume.TotalSize)
vol["used_size"] = flattenCapacity(volume.UsedSize)
volumes = append(volumes, vol)
}
return volumes
}
func flattenProviderVolumes(v []providerVolume) interface{} {
volumes := make([]map[string]interface{}, 0, len(v))
for _, volume := range v {
vol := make(map[string]interface{})
vol["id"] = volume.ID
vol["name"] = volume.Name
vol["state"] = volume.State
vol["device"] = volume.Device
vol["instance_id"] = volume.InstanceID
vol["disk_type"] = volume.DiskType
vol["encrypted"] = volume.Encrypted
vol["iops"] = volume.Iops
vol["throughput"] = volume.Throughput
vol["size"] = flattenCapacity(volume.Size)
volumes = append(volumes, vol)
}
return volumes
}