Skip to content

Commit bcd521f

Browse files
authored
Merge pull request #12 from halprin/stream-scan-request
Stream Scan Request
2 parents f3ebbd2 + 1445047 commit bcd521f

5 files changed

Lines changed: 43 additions & 38 deletions

File tree

dynamo/delete.go

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import (
1111
var maxItemsPerBatchRequest = 25
1212
var tableKeys []*dynamodb.KeySchemaElement
1313

14-
func deleteItems(dynamoItems []map[string]*dynamodb.AttributeValue, tableName string) error {
14+
func deleteItems(dynamoItems []map[string]*dynamodb.AttributeValue, tableName string, goroutinePool *parallel.Pool) error {
1515

1616
var err error
1717
tableKeys, err = getTableKeys(tableName)
@@ -22,15 +22,6 @@ func deleteItems(dynamoItems []map[string]*dynamodb.AttributeValue, tableName st
2222

2323
dynamoItemsChunks := chunkItems(dynamoItems)
2424

25-
concurrency, err := determineConcurrency(tableName)
26-
if err != nil {
27-
log.Println("Unable determine the concurrency")
28-
return err
29-
}
30-
31-
goroutinePool := parallel.NewPool(concurrency, len(dynamoItemsChunks))
32-
defer goroutinePool.Release()
33-
3425
var errorChannels []chan error
3526

3627
for _, currentItemsChunk := range dynamoItemsChunks {

dynamo/dynamo.go

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"github.com/aws/aws-sdk-go/aws/session"
66
"github.com/aws/aws-sdk-go/service/dynamodb"
77
"github.com/halprin/delete-dynamodb-items/config"
8+
"github.com/halprin/delete-dynamodb-items/parallel"
89
"log"
910
)
1011

@@ -25,13 +26,24 @@ func DeleteAllItemsInTable() error {
2526

2627
tableName := *config.GetTableName()
2728

28-
items, err := getItems(tableName)
29+
concurrency, err := determineConcurrency(tableName)
2930
if err != nil {
31+
log.Println("Unable determine the concurrency")
3032
return err
3133
}
3234

33-
err = deleteItems(items, tableName)
34-
return err
35+
// 1024 * 1024 / 25 = 41,943.04 ~= 41,944
36+
goroutinePool := parallel.NewPool(concurrency, 41944)
37+
defer goroutinePool.Release()
38+
39+
for subItemList := range getItemsGoroutine(tableName) {
40+
err = deleteItems(subItemList, tableName, goroutinePool)
41+
if err != nil {
42+
return err
43+
}
44+
}
45+
46+
return nil
3547
}
3648

3749
func describeTable(tableName string) (*dynamodb.DescribeTableOutput, error) {

dynamo/scan.go

Lines changed: 24 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -6,34 +6,35 @@ import (
66
"log"
77
)
88

9-
func getItems(tableName string) ([]map[string]*dynamodb.AttributeValue, error) {
9+
func getItemsGoroutine(tableName string) chan []map[string]*dynamodb.AttributeValue {
10+
yield := make(chan []map[string]*dynamodb.AttributeValue)
1011

11-
var scannedItems []map[string]*dynamodb.AttributeValue
12-
13-
scanInput := &dynamodb.ScanInput{
14-
TableName: aws.String(tableName),
15-
}
12+
go func() {
13+
scanInput := &dynamodb.ScanInput{
14+
TableName: aws.String(tableName),
15+
}
1616

17-
for {
18-
log.Println("Scanning items")
17+
for {
18+
log.Println("Scanning items")
1919

20-
scanOutput, err := dynamoService.Scan(scanInput)
21-
if err != nil {
22-
log.Println("Failed to scan the items")
23-
return nil, err
24-
}
20+
scanOutput, err := dynamoService.Scan(scanInput)
21+
if err != nil {
22+
log.Println("Failed to scan the items")
23+
break
24+
}
2525

26-
scannedItems = append(scannedItems, scanOutput.Items...)
26+
yield <- scanOutput.Items
2727

28-
if scanOutput.LastEvaluatedKey != nil && len(scanOutput.LastEvaluatedKey) > 0 {
29-
//there are still items to scan, the the key to start scanning from again
30-
scanInput.ExclusiveStartKey = scanOutput.LastEvaluatedKey
31-
} else {
32-
//no more items to scan, break out
33-
break
28+
if scanOutput.LastEvaluatedKey != nil && len(scanOutput.LastEvaluatedKey) > 0 {
29+
//there are still items to scan, the the key to start scanning from again
30+
scanInput.ExclusiveStartKey = scanOutput.LastEvaluatedKey
31+
} else {
32+
//no more items to scan, break out
33+
break
34+
}
3435
}
35-
}
36+
close(yield)
37+
}()
3638

37-
return scannedItems, nil
39+
return yield
3840
}
39-

generate_mass_data.sh

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,10 @@ aws dynamodb create-table --table-name "${table_name}" --attribute-definitions A
99
items_preamble="{\"${table_name}\": ["
1010
items_middle=""
1111
items_ending=']}'
12+
lorem_ipsum='Lorem ipsum dolor sit amet, consectetur adipiscing elit. Donec a efficitur nunc. Morbi fermentum sem metus, vel venenatis leo porttitor quis. Etiam maximus neque a pharetra viverra. Sed turpis lacus, blandit ac tortor elementum, scelerisque feugiat risus. Nam malesuada augue et purus aliquet, et semper dolor cursus. Suspendisse volutpat dolor nec efficitur rutrum. Aliquam leo libero, posuere eget vulputate in, luctus nec nibh. Donec eu tellus eu libero scelerisque molestie. Ut sed pretium nibh. Donec suscipit eget dui quis lacinia. Aliquam non pulvinar massa, nec blandit lectus. Cras sollicitudin rhoncus ex. Nunc ipsum dui, dictum in risus nec, convallis rutrum justo. In tempor dui nisl, in fringilla massa vehicula ac. Donec a ipsum luctus, venenatis magna ut, venenatis risus. Vivamus eu dapibus odio. Aenean dapibus urna orci, sed pharetra nunc dapibus ac. Praesent ornare, felis sit amet mattis faucibus, odio arcu laoreet arcu, eu blandit nisi turpis cursus enim.'
1213

1314
for ((index = 1 ; index <= num_items ; index++)); do
14-
current_request="{\"PutRequest\": {\"Item\": {\"id\": {\"S\": \"$(uuidgen)\"}}}}"
15+
current_request="{\"PutRequest\": {\"Item\": {\"id\": {\"S\": \"$(uuidgen)\"}, \"text\": {\"S\": \"${lorem_ipsum}\"}}}}"
1516
items_middle="${items_middle}${current_request},"
1617
if [[ $((index % 25)) == 0 ]]; then
1718
items_middle=${items_middle::${#items_middle}-1}

parallel/pool.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ type Pool struct {
1010
waitGroup sync.WaitGroup
1111
}
1212

13-
//poolSize needs to be bigger than taskQueueSize
13+
//taskQueueSize needs to be bigger than poolSize if you want to saturate the pool
1414
func NewPool(poolSize int, taskQueueSize int) *Pool {
1515
newPool := &Pool{
1616
ingestionPoolChannel: make(chan func(), taskQueueSize),

0 commit comments

Comments
 (0)