Skip to content

Commit fced583

Browse files
authored
Merge pull request #1 from halprin/mass-dump
Initial PR that puts everything for MVP
2 parents 5c37c11 + c1f91ae commit fced583

7 files changed

Lines changed: 315 additions & 0 deletions

File tree

cmd/main.go

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
package main
2+
3+
import (
4+
"errors"
5+
"github.com/halprin/delete-dynamodb-items/dynamo"
6+
"log"
7+
"os"
8+
)
9+
10+
func main() {
11+
log.Println("Start")
12+
13+
tableName, err := getTableName()
14+
if err != nil {
15+
killExecution(err)
16+
}
17+
18+
err = dynamo.DeleteAllItemsInTable(tableName)
19+
if err != nil {
20+
killExecution(err)
21+
}
22+
log.Println("Complete")
23+
}
24+
25+
func getTableName() (string, error) {
26+
if len(os.Args) < 2 {
27+
return "", errors.New("Provide a table name for the first argument")
28+
}
29+
return os.Args[1], nil
30+
}
31+
32+
func killExecution(err error) {
33+
log.Println("Failure")
34+
log.Fatal(err.Error())
35+
}

dynamo/delete.go

Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
package dynamo
2+
3+
import (
4+
"github.com/aws/aws-sdk-go/aws"
5+
"github.com/aws/aws-sdk-go/service/dynamodb"
6+
"github.com/halprin/delete-dynamodb-items/parallel"
7+
"log"
8+
"time"
9+
)
10+
11+
var maxItemsPerBatchRequest = 25
12+
var tableKeys []*dynamodb.KeySchemaElement
13+
14+
func deleteItems(dynamoItems []map[string]*dynamodb.AttributeValue, tableName string) error {
15+
16+
var err error
17+
tableKeys, err = getTableKeys(tableName)
18+
if err != nil {
19+
log.Println("Unable to determine the keys of the table")
20+
return err
21+
}
22+
23+
dynamoItemsChunks := chunkItems(dynamoItems)
24+
25+
var errorChannels []chan error
26+
27+
for _, currentItemsChunk := range dynamoItemsChunks {
28+
errorChannel := make(chan error)
29+
errorChannels = append(errorChannels, errorChannel)
30+
go deleteChunkGoroutine(currentItemsChunk, tableName, errorChannel)
31+
}
32+
33+
for errorFromGoroutine := range parallel.MergeErrorChannels(errorChannels) {
34+
if errorFromGoroutine != nil {
35+
log.Println("One of the delete goroutines failed")
36+
return errorFromGoroutine
37+
}
38+
}
39+
40+
return nil
41+
}
42+
43+
func deleteChunkGoroutine(currentItemsChunk []map[string]*dynamodb.AttributeValue, tableName string, errorChannel chan error) {
44+
errorChannel <- deleteChunk(currentItemsChunk, tableName)
45+
close(errorChannel)
46+
}
47+
48+
func deleteChunk(currentItemsChunk []map[string]*dynamodb.AttributeValue, tableName string) error {
49+
writeRequests := marshalItemsIntoBatchWrites(currentItemsChunk)
50+
51+
requestItems := map[string][]*dynamodb.WriteRequest{
52+
tableName: writeRequests,
53+
}
54+
55+
err := incrementallyBatchDelete(requestItems)
56+
if err != nil {
57+
log.Println("Failed to batch delete items")
58+
return err
59+
}
60+
61+
return nil
62+
}
63+
64+
func getTableKeys(tableName string) ([]*dynamodb.KeySchemaElement, error) {
65+
describeTableInput := &dynamodb.DescribeTableInput{
66+
TableName: aws.String(tableName),
67+
}
68+
69+
tableInfo, err := dynamoService.DescribeTable(describeTableInput)
70+
if err != nil {
71+
log.Println("Unable to describe the the table")
72+
return nil, err
73+
}
74+
75+
return tableInfo.Table.KeySchema, nil
76+
}
77+
78+
func chunkItems(dynamoItems []map[string]*dynamodb.AttributeValue) [][]map[string]*dynamodb.AttributeValue {
79+
var itemChunks [][]map[string]*dynamodb.AttributeValue
80+
numberOfItems := len(dynamoItems)
81+
82+
for itemIndex := 0; itemIndex < numberOfItems; itemIndex += maxItemsPerBatchRequest {
83+
end := itemIndex + maxItemsPerBatchRequest
84+
85+
if end > numberOfItems {
86+
end = numberOfItems
87+
}
88+
89+
itemChunks = append(itemChunks, dynamoItems[itemIndex:end])
90+
}
91+
92+
return itemChunks
93+
}
94+
95+
func marshalItemsIntoBatchWrites(dynamoItems []map[string]*dynamodb.AttributeValue) []*dynamodb.WriteRequest {
96+
var writeRequests []*dynamodb.WriteRequest
97+
var writeRequest *dynamodb.WriteRequest
98+
99+
for _, currentDynamoItem := range dynamoItems {
100+
key := convertItemToKey(currentDynamoItem)
101+
102+
deleteRequest := &dynamodb.DeleteRequest{
103+
Key: key,
104+
}
105+
106+
writeRequest = &dynamodb.WriteRequest{
107+
DeleteRequest: deleteRequest,
108+
}
109+
110+
writeRequests = append(writeRequests, writeRequest)
111+
}
112+
113+
return writeRequests
114+
}
115+
116+
func convertItemToKey(item map[string]*dynamodb.AttributeValue) map[string]*dynamodb.AttributeValue {
117+
key := make(map[string]*dynamodb.AttributeValue)
118+
for _, currentTableKey := range tableKeys {
119+
currentTableKeyName := *currentTableKey.AttributeName
120+
key[currentTableKeyName] = item[currentTableKeyName]
121+
}
122+
123+
return key
124+
}
125+
126+
func incrementallyBatchDelete(requestItems map[string][]*dynamodb.WriteRequest) error {
127+
millisecondsToWait := 20
128+
129+
for {
130+
batchWriteItemInput := &dynamodb.BatchWriteItemInput{
131+
RequestItems: requestItems,
132+
}
133+
134+
log.Println("Deleting some items")
135+
136+
batchWriteItemOutput, err := dynamoService.BatchWriteItem(batchWriteItemInput)
137+
if err != nil {
138+
//there was an error writing to DynamoDB
139+
log.Println("Failed to put/delete items in DynamoDB")
140+
return err
141+
}
142+
143+
if len(batchWriteItemOutput.UnprocessedItems) > 0 {
144+
//there are still items to write, reset requestItems for the next pass
145+
log.Println("Unprocessed items remain, trying again with remaining items")
146+
requestItems = batchWriteItemOutput.UnprocessedItems
147+
} else {
148+
//no more items to write, break out
149+
break
150+
}
151+
152+
//do an exponential back-off
153+
time.Sleep(time.Duration(millisecondsToWait) * time.Millisecond)
154+
millisecondsToWait *= 2
155+
}
156+
157+
return nil
158+
}

dynamo/dynamo.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
package dynamo
2+
3+
import (
4+
"github.com/aws/aws-sdk-go/aws/session"
5+
"github.com/aws/aws-sdk-go/service/dynamodb"
6+
"log"
7+
)
8+
9+
var awsSession, sessionErr = session.NewSession()
10+
var dynamoService = dynamodb.New(awsSession)
11+
12+
func DeleteAllItemsInTable(tableName string) error {
13+
if sessionErr != nil {
14+
log.Println("Initial AWS session failed")
15+
return sessionErr
16+
}
17+
18+
items, err := getItems(tableName)
19+
if err != nil {
20+
return err
21+
}
22+
23+
err = deleteItems(items, tableName)
24+
return err
25+
}

dynamo/scan.go

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
package dynamo
2+
3+
import (
4+
"github.com/aws/aws-sdk-go/aws"
5+
"github.com/aws/aws-sdk-go/service/dynamodb"
6+
"log"
7+
)
8+
9+
func getItems(tableName string) ([]map[string]*dynamodb.AttributeValue, error) {
10+
11+
var scannedItems []map[string]*dynamodb.AttributeValue
12+
13+
scanInput := &dynamodb.ScanInput{
14+
TableName: aws.String(tableName),
15+
}
16+
17+
for {
18+
log.Println("Scanning items")
19+
20+
scanOutput, err := dynamoService.Scan(scanInput)
21+
if err != nil {
22+
log.Println("Failed to scan the items")
23+
return nil, err
24+
}
25+
26+
scannedItems = append(scannedItems, scanOutput.Items...)
27+
28+
if scanOutput.LastEvaluatedKey != nil && len(scanOutput.LastEvaluatedKey) > 0 {
29+
//there are still items to scan, the the key to start scanning from again
30+
scanInput.ExclusiveStartKey = scanOutput.LastEvaluatedKey
31+
} else {
32+
//no more items to scan, break out
33+
break
34+
}
35+
}
36+
37+
return scannedItems, nil
38+
}
39+

go.mod

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
module github.com/halprin/delete-dynamodb-items
2+
3+
go 1.15
4+
5+
require github.com/aws/aws-sdk-go v1.37.15

go.sum

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
github.com/aws/aws-sdk-go v1.37.15 h1:W7l7gLLMcYRlg6a+uvf3Zz4jYwdqYzhe5ymqwWoOhp4=
2+
github.com/aws/aws-sdk-go v1.37.15/go.mod h1:hcU610XS61/+aQV88ixoOzUoG7v3b31pl2zKMmprdro=
3+
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
4+
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
5+
github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg=
6+
github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
7+
github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGwWFoC7ycTf1rcQZHOlsJ6N8=
8+
github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U=
9+
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
10+
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
11+
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
12+
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
13+
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
14+
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
15+
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
16+
golang.org/x/net v0.0.0-20201110031124-69a78807bb2b h1:uwuIcX0g4Yl1NC5XAz37xsr2lTtcqevgzYNVt49waME=
17+
golang.org/x/net v0.0.0-20201110031124-69a78807bb2b/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
18+
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
19+
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
20+
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
21+
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
22+
golang.org/x/text v0.3.3 h1:cokOdA+Jmi5PJGXLlLllQSgYigAEfHXJAERHVMaCc2k=
23+
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
24+
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
25+
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
26+
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
27+
gopkg.in/yaml.v2 v2.2.8 h1:obN1ZagJSUGI0Ek/LBmuj4SNLPfIny3KsKFopxRdj10=
28+
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=

parallel/channels.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
package parallel
2+
3+
import "sync"
4+
5+
func MergeErrorChannels(inputChannels []chan error) <-chan error {
6+
outChannel := make(chan error)
7+
var waitGroup sync.WaitGroup
8+
waitGroup.Add(len(inputChannels))
9+
10+
for _, currentInputChannel := range inputChannels {
11+
go func(currentInputChannelCopy <-chan error) {
12+
for valueFromInputChannel := range currentInputChannelCopy {
13+
outChannel <- valueFromInputChannel
14+
}
15+
waitGroup.Done()
16+
}(currentInputChannel)
17+
}
18+
19+
go func() {
20+
waitGroup.Wait()
21+
close(outChannel)
22+
}()
23+
24+
return outChannel
25+
}

0 commit comments

Comments
 (0)