Skip to content

Commit dfc2c64

Browse files
authored
Merge pull request #8 from halprin/try-conurrency
Limit Concurrency
2 parents f3a9844 + fca14ef commit dfc2c64

5 files changed

Lines changed: 177 additions & 15 deletions

File tree

dynamo/concurrency.go

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
package dynamo
2+
3+
import (
4+
"github.com/aws/aws-sdk-go/service/dynamodb"
5+
"log"
6+
"math"
7+
"runtime"
8+
)
9+
10+
func determineConcurrency(tableName string) (int, error) {
11+
tableInfo, err := describeTable(tableName)
12+
if err != nil {
13+
return 0, err
14+
}
15+
16+
if isOnDemand(tableInfo) {
17+
concurrency := getOnDemandConcurrency()
18+
log.Printf("Given on demand concurrency, set concurrency to %d\n", concurrency)
19+
return concurrency, nil
20+
}
21+
22+
//the table has provisioned capacity
23+
24+
numberOfItems := getNumberOfItems(tableInfo)
25+
tableSize := getTableSizeInBytes(tableInfo)
26+
27+
roundedUpAverageItemSize := calculateAverageItemSize(tableSize, numberOfItems)
28+
totalBatchSize := float64(maxItemsPerBatchRequest) * roundedUpAverageItemSize
29+
30+
writeCapacityUnits := getWriteCapacityUnits(tableInfo)
31+
rawConcurrency := float64(writeCapacityUnits) / totalBatchSize
32+
33+
concurrency := 1
34+
if rawConcurrency > 1 {
35+
//possible truncation to size of int
36+
concurrency = int(rawConcurrency)
37+
}
38+
39+
log.Printf("Given provisioned write capacity of %d, number of items %d, and table size %f KB, set concurrency to %d\n", writeCapacityUnits, numberOfItems, float64(tableSize) / float64(1024), concurrency)
40+
41+
return concurrency, nil
42+
}
43+
44+
func isOnDemand(describeTable *dynamodb.DescribeTableOutput) bool {
45+
billingModeSummary := describeTable.Table.BillingModeSummary
46+
if billingModeSummary != nil {
47+
return *describeTable.Table.BillingModeSummary.BillingMode == dynamodb.BillingModePayPerRequest
48+
}
49+
50+
return getWriteCapacityUnits(describeTable) == 0
51+
}
52+
53+
func getWriteCapacityUnits(describeTable *dynamodb.DescribeTableOutput) int64 {
54+
return *describeTable.Table.ProvisionedThroughput.WriteCapacityUnits
55+
}
56+
57+
func getNumberOfItems(describeTable *dynamodb.DescribeTableOutput) int64 {
58+
return *describeTable.Table.ItemCount
59+
}
60+
61+
func getTableSizeInBytes(describeTable *dynamodb.DescribeTableOutput) int64 {
62+
return *describeTable.Table.TableSizeBytes
63+
}
64+
65+
func getOnDemandConcurrency() int {
66+
//on demand's concurrency is the number of logical CPUs
67+
return runtime.NumCPU()
68+
}
69+
70+
func calculateAverageItemSize(tableSize int64, numberOfItems int64) float64 {
71+
if tableSize == 0 || numberOfItems == 0 {
72+
//possible for the size or number of items to be 0 since they aren't always updated
73+
return 1.0
74+
}
75+
76+
//truncates some of the digits if tableSize is too big
77+
//1024 to get Kilobytes
78+
averageItemSize := float64(tableSize) / float64(1024) / float64(numberOfItems)
79+
return math.Ceil(averageItemSize)
80+
}

dynamo/delete.go

Lines changed: 35 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
package dynamo
22

33
import (
4-
"github.com/aws/aws-sdk-go/aws"
54
"github.com/aws/aws-sdk-go/service/dynamodb"
65
"github.com/halprin/delete-dynamodb-items/parallel"
76
"log"
7+
"math/rand"
88
"time"
99
)
1010

@@ -22,14 +22,33 @@ func deleteItems(dynamoItems []map[string]*dynamodb.AttributeValue, tableName st
2222

2323
dynamoItemsChunks := chunkItems(dynamoItems)
2424

25+
concurrency, err := determineConcurrency(tableName)
26+
if err != nil {
27+
log.Println("Unable determine the concurrency")
28+
return err
29+
}
30+
31+
goroutinePool := parallel.NewPool(concurrency, len(dynamoItemsChunks))
32+
defer goroutinePool.Release()
33+
2534
var errorChannels []chan error
2635

2736
for _, currentItemsChunk := range dynamoItemsChunks {
28-
errorChannel := make(chan error)
37+
38+
errorChannel := make(chan error, 1)
2939
errorChannels = append(errorChannels, errorChannel)
30-
go deleteChunkGoroutine(currentItemsChunk, tableName, errorChannel)
40+
41+
//wrapping in a function to make a copy of the currentItemsChunk and errorChannel arguments that are passed in,
42+
//else all executions try to delete the same chunk of items
43+
func(currentItemsChunk []map[string]*dynamodb.AttributeValue, errorChannel chan error) {
44+
goroutinePool.Submit(func() {
45+
deleteChunkGoroutine(currentItemsChunk, tableName, errorChannel)
46+
})
47+
}(currentItemsChunk, errorChannel)
3148
}
3249

50+
log.Println("Waiting for all deletion goroutines to complete")
51+
3352
for errorFromGoroutine := range parallel.MergeErrorChannels(errorChannels) {
3453
if errorFromGoroutine != nil {
3554
log.Println("One of the delete goroutines failed")
@@ -62,16 +81,10 @@ func deleteChunk(currentItemsChunk []map[string]*dynamodb.AttributeValue, tableN
6281
}
6382

6483
func getTableKeys(tableName string) ([]*dynamodb.KeySchemaElement, error) {
65-
describeTableInput := &dynamodb.DescribeTableInput{
66-
TableName: aws.String(tableName),
67-
}
68-
69-
tableInfo, err := dynamoService.DescribeTable(describeTableInput)
84+
tableInfo, err := describeTable(tableName)
7085
if err != nil {
71-
log.Println("Unable to describe the the table")
7286
return nil, err
7387
}
74-
7588
return tableInfo.Table.KeySchema, nil
7689
}
7790

@@ -124,7 +137,15 @@ func convertItemToKey(item map[string]*dynamodb.AttributeValue) map[string]*dyna
124137
}
125138

126139
func incrementallyBatchDelete(requestItems map[string][]*dynamodb.WriteRequest) error {
127-
millisecondsToWait := 20
140+
//used to induce jitter
141+
randomGenerator := rand.New(rand.NewSource(time.Now().UnixNano()))
142+
143+
baseMillisecondsToWait := 20
144+
maxMillisecondsToWait := 40
145+
millisecondsToWait := randomGenerator.Intn(maxMillisecondsToWait)
146+
147+
//start of waiting so all the goroutines don't call batch delete at the same time
148+
time.Sleep(time.Duration(millisecondsToWait) * time.Millisecond)
128149

129150
for {
130151
batchWriteItemInput := &dynamodb.BatchWriteItemInput{
@@ -149,9 +170,10 @@ func incrementallyBatchDelete(requestItems map[string][]*dynamodb.WriteRequest)
149170
break
150171
}
151172

152-
//do an exponential back-off
173+
//do an exponential back-off with jitter
153174
time.Sleep(time.Duration(millisecondsToWait) * time.Millisecond)
154-
millisecondsToWait *= 2
175+
maxMillisecondsToWait *= 2
176+
millisecondsToWait = baseMillisecondsToWait + randomGenerator.Intn(maxMillisecondsToWait)
155177
}
156178

157179
return nil

dynamo/dynamo.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package dynamo
22

33
import (
4+
"github.com/aws/aws-sdk-go/aws"
45
"github.com/aws/aws-sdk-go/aws/session"
56
"github.com/aws/aws-sdk-go/service/dynamodb"
67
"log"
@@ -23,3 +24,17 @@ func DeleteAllItemsInTable(tableName string) error {
2324
err = deleteItems(items, tableName)
2425
return err
2526
}
27+
28+
func describeTable(tableName string) (*dynamodb.DescribeTableOutput, error) {
29+
describeTableInput := &dynamodb.DescribeTableInput{
30+
TableName: aws.String(tableName),
31+
}
32+
33+
tableInfo, err := dynamoService.DescribeTable(describeTableInput)
34+
if err != nil {
35+
log.Println("Unable to describe the the table")
36+
return nil, err
37+
}
38+
39+
return tableInfo, nil
40+
}

go.sum

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,3 @@
1-
github.com/aws/aws-sdk-go v1.37.15 h1:W7l7gLLMcYRlg6a+uvf3Zz4jYwdqYzhe5ymqwWoOhp4=
2-
github.com/aws/aws-sdk-go v1.37.15/go.mod h1:hcU610XS61/+aQV88ixoOzUoG7v3b31pl2zKMmprdro=
31
github.com/aws/aws-sdk-go v1.37.17 h1:Ga33kM38f58l7X+Z2B6JNdz9dFqxjR8AXHBbK3bXYc0=
42
github.com/aws/aws-sdk-go v1.37.17/go.mod h1:hcU610XS61/+aQV88ixoOzUoG7v3b31pl2zKMmprdro=
53
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=

parallel/pool.go

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
package parallel
2+
3+
import (
4+
"sync"
5+
)
6+
7+
type Pool struct {
8+
ingestionPoolChannel chan func()
9+
executionPoolChannel chan func()
10+
waitGroup sync.WaitGroup
11+
}
12+
13+
//poolSize needs to be bigger than taskQueueSize
14+
func NewPool(poolSize int, taskQueueSize int) *Pool {
15+
newPool := &Pool{
16+
ingestionPoolChannel: make(chan func(), taskQueueSize),
17+
executionPoolChannel: make(chan func(), poolSize),
18+
}
19+
20+
go newPool.submitIngestionGoroutine()
21+
for workerIndex := 0; workerIndex < poolSize; workerIndex++ {
22+
go newPool.submitExecutionGoroutine()
23+
}
24+
25+
return newPool
26+
}
27+
28+
func (pool *Pool) Submit(task func()) {
29+
pool.ingestionPoolChannel <- task
30+
}
31+
32+
func (pool *Pool) Release() {
33+
close(pool.ingestionPoolChannel)
34+
close(pool.executionPoolChannel)
35+
}
36+
37+
func (pool *Pool) submitIngestionGoroutine() {
38+
for submittedTask := range pool.ingestionPoolChannel {
39+
pool.executionPoolChannel <- submittedTask
40+
}
41+
}
42+
43+
func (pool *Pool) submitExecutionGoroutine() {
44+
for submittedTask := range pool.executionPoolChannel {
45+
submittedTask()
46+
}
47+
}

0 commit comments

Comments
 (0)