Skip to content

Commit 1445047

Browse files
committed
Stream the scan results 1 page at a time so we don't load the entire table into memory
1 parent e96c9a4 commit 1445047

4 files changed

Lines changed: 41 additions & 37 deletions

File tree

dynamo/delete.go

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import (
1111
var maxItemsPerBatchRequest = 25
1212
var tableKeys []*dynamodb.KeySchemaElement
1313

14-
func deleteItems(dynamoItems []map[string]*dynamodb.AttributeValue, tableName string) error {
14+
func deleteItems(dynamoItems []map[string]*dynamodb.AttributeValue, tableName string, goroutinePool *parallel.Pool) error {
1515

1616
var err error
1717
tableKeys, err = getTableKeys(tableName)
@@ -22,15 +22,6 @@ func deleteItems(dynamoItems []map[string]*dynamodb.AttributeValue, tableName st
2222

2323
dynamoItemsChunks := chunkItems(dynamoItems)
2424

25-
concurrency, err := determineConcurrency(tableName)
26-
if err != nil {
27-
log.Println("Unable determine the concurrency")
28-
return err
29-
}
30-
31-
goroutinePool := parallel.NewPool(concurrency, len(dynamoItemsChunks))
32-
defer goroutinePool.Release()
33-
3425
var errorChannels []chan error
3526

3627
for _, currentItemsChunk := range dynamoItemsChunks {

dynamo/dynamo.go

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"github.com/aws/aws-sdk-go/aws/session"
66
"github.com/aws/aws-sdk-go/service/dynamodb"
77
"github.com/halprin/delete-dynamodb-items/config"
8+
"github.com/halprin/delete-dynamodb-items/parallel"
89
"log"
910
)
1011

@@ -25,13 +26,24 @@ func DeleteAllItemsInTable() error {
2526

2627
tableName := *config.GetTableName()
2728

28-
items, err := getItems(tableName)
29+
concurrency, err := determineConcurrency(tableName)
2930
if err != nil {
31+
log.Println("Unable determine the concurrency")
3032
return err
3133
}
3234

33-
err = deleteItems(items, tableName)
34-
return err
35+
// 1024 * 1024 / 25 = 41,943.04 ~= 41,944
36+
goroutinePool := parallel.NewPool(concurrency, 41944)
37+
defer goroutinePool.Release()
38+
39+
for subItemList := range getItemsGoroutine(tableName) {
40+
err = deleteItems(subItemList, tableName, goroutinePool)
41+
if err != nil {
42+
return err
43+
}
44+
}
45+
46+
return nil
3547
}
3648

3749
func describeTable(tableName string) (*dynamodb.DescribeTableOutput, error) {

dynamo/scan.go

Lines changed: 24 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -6,34 +6,35 @@ import (
66
"log"
77
)
88

9-
func getItems(tableName string) ([]map[string]*dynamodb.AttributeValue, error) {
9+
func getItemsGoroutine(tableName string) chan []map[string]*dynamodb.AttributeValue {
10+
yield := make(chan []map[string]*dynamodb.AttributeValue)
1011

11-
var scannedItems []map[string]*dynamodb.AttributeValue
12-
13-
scanInput := &dynamodb.ScanInput{
14-
TableName: aws.String(tableName),
15-
}
12+
go func() {
13+
scanInput := &dynamodb.ScanInput{
14+
TableName: aws.String(tableName),
15+
}
1616

17-
for {
18-
log.Println("Scanning items")
17+
for {
18+
log.Println("Scanning items")
1919

20-
scanOutput, err := dynamoService.Scan(scanInput)
21-
if err != nil {
22-
log.Println("Failed to scan the items")
23-
return nil, err
24-
}
20+
scanOutput, err := dynamoService.Scan(scanInput)
21+
if err != nil {
22+
log.Println("Failed to scan the items")
23+
break
24+
}
2525

26-
scannedItems = append(scannedItems, scanOutput.Items...)
26+
yield <- scanOutput.Items
2727

28-
if scanOutput.LastEvaluatedKey != nil && len(scanOutput.LastEvaluatedKey) > 0 {
29-
//there are still items to scan, the the key to start scanning from again
30-
scanInput.ExclusiveStartKey = scanOutput.LastEvaluatedKey
31-
} else {
32-
//no more items to scan, break out
33-
break
28+
if scanOutput.LastEvaluatedKey != nil && len(scanOutput.LastEvaluatedKey) > 0 {
29+
//there are still items to scan, the the key to start scanning from again
30+
scanInput.ExclusiveStartKey = scanOutput.LastEvaluatedKey
31+
} else {
32+
//no more items to scan, break out
33+
break
34+
}
3435
}
35-
}
36+
close(yield)
37+
}()
3638

37-
return scannedItems, nil
39+
return yield
3840
}
39-

parallel/pool.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ type Pool struct {
1010
waitGroup sync.WaitGroup
1111
}
1212

13-
//poolSize needs to be bigger than taskQueueSize
13+
//taskQueueSize needs to be bigger than poolSize if you want to saturate the pool
1414
func NewPool(poolSize int, taskQueueSize int) *Pool {
1515
newPool := &Pool{
1616
ingestionPoolChannel: make(chan func(), taskQueueSize),

0 commit comments

Comments
 (0)