Skip to content

Commit 6905c19

Browse files
Copilotbootjp
andcommitted
Add concurrent transaction tests for DynamoDB adapter
Co-authored-by: bootjp <1306365+bootjp@users.noreply.github.com>
1 parent 2307187 commit 6905c19

1 file changed

Lines changed: 183 additions & 0 deletions

File tree

adapter/dynamodb_test.go

Lines changed: 183 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ package adapter
22

33
import (
44
"context"
5+
"strconv"
6+
"sync"
57
"testing"
68

79
"github.com/aws/aws-sdk-go/aws"
@@ -171,3 +173,184 @@ func TestDynamoDB_UpdateItem_Condition(t *testing.T) {
171173
})
172174
assert.Error(t, err)
173175
}
176+
177+
func TestDynamoDB_TransactWriteItems_Concurrent(t *testing.T) {
178+
t.Parallel()
179+
nodes, _, _ := createNode(t, 1)
180+
defer shutdown(nodes)
181+
182+
sess, err := session.NewSession(&aws.Config{
183+
Region: aws.String("us-west-2"),
184+
Endpoint: aws.String("http://" + nodes[0].dynamoAddress),
185+
Credentials: credentials.NewStaticCredentials("dummy", "dummy", ""),
186+
DisableSSL: aws.Bool(true),
187+
})
188+
assert.NoError(t, err)
189+
190+
client := dynamodb.New(sess)
191+
192+
wg := &sync.WaitGroup{}
193+
numGoroutines := 100
194+
195+
for i := 0; i < numGoroutines; i++ {
196+
wg.Add(1)
197+
go func(i int) {
198+
defer wg.Done()
199+
200+
keyPrefix := "concurrent-txn-" + strconv.Itoa(i)
201+
key1 := keyPrefix + "-k1"
202+
key2 := keyPrefix + "-k2"
203+
value1 := "v1-" + strconv.Itoa(i)
204+
value2 := "v2-" + strconv.Itoa(i)
205+
206+
// Perform transaction with two put operations
207+
_, err := client.TransactWriteItemsWithContext(context.Background(), &dynamodb.TransactWriteItemsInput{
208+
TransactItems: []*dynamodb.TransactWriteItem{
209+
{
210+
Put: &dynamodb.Put{
211+
TableName: aws.String("t"),
212+
Item: map[string]*dynamodb.AttributeValue{
213+
"key": {S: aws.String(key1)},
214+
"value": {S: aws.String(value1)},
215+
},
216+
},
217+
},
218+
{
219+
Put: &dynamodb.Put{
220+
TableName: aws.String("t"),
221+
Item: map[string]*dynamodb.AttributeValue{
222+
"key": {S: aws.String(key2)},
223+
"value": {S: aws.String(value2)},
224+
},
225+
},
226+
},
227+
},
228+
})
229+
assert.NoError(t, err, "Transaction failed for goroutine %d", i)
230+
231+
// Verify both items were written correctly
232+
out1, err := client.GetItemWithContext(context.Background(), &dynamodb.GetItemInput{
233+
TableName: aws.String("t"),
234+
Key: map[string]*dynamodb.AttributeValue{
235+
"key": {S: aws.String(key1)},
236+
},
237+
})
238+
assert.NoError(t, err, "Get failed for key1 in goroutine %d", i)
239+
assert.Equal(t, value1, aws.StringValue(out1.Item["value"].S), "Value mismatch for key1 in goroutine %d", i)
240+
241+
out2, err := client.GetItemWithContext(context.Background(), &dynamodb.GetItemInput{
242+
TableName: aws.String("t"),
243+
Key: map[string]*dynamodb.AttributeValue{
244+
"key": {S: aws.String(key2)},
245+
},
246+
})
247+
assert.NoError(t, err, "Get failed for key2 in goroutine %d", i)
248+
assert.Equal(t, value2, aws.StringValue(out2.Item["value"].S), "Value mismatch for key2 in goroutine %d", i)
249+
}(i)
250+
}
251+
252+
wg.Wait()
253+
}
254+
255+
func TestDynamoDB_TransactWriteItems_Concurrent_Conflicting(t *testing.T) {
256+
t.Parallel()
257+
nodes, _, _ := createNode(t, 1)
258+
defer shutdown(nodes)
259+
260+
sess, err := session.NewSession(&aws.Config{
261+
Region: aws.String("us-west-2"),
262+
Endpoint: aws.String("http://" + nodes[0].dynamoAddress),
263+
Credentials: credentials.NewStaticCredentials("dummy", "dummy", ""),
264+
DisableSSL: aws.Bool(true),
265+
})
266+
assert.NoError(t, err)
267+
268+
client := dynamodb.New(sess)
269+
270+
// Initialize some base keys that will be updated concurrently
271+
baseKeys := []string{"shared-key-1", "shared-key-2", "shared-key-3"}
272+
for _, key := range baseKeys {
273+
_, err := client.PutItemWithContext(context.Background(), &dynamodb.PutItemInput{
274+
TableName: aws.String("t"),
275+
Item: map[string]*dynamodb.AttributeValue{
276+
"key": {S: aws.String(key)},
277+
"value": {S: aws.String("initial")},
278+
"counter": {N: aws.String("0")},
279+
},
280+
})
281+
assert.NoError(t, err)
282+
}
283+
284+
wg := &sync.WaitGroup{}
285+
numGoroutines := 50
286+
287+
for i := 0; i < numGoroutines; i++ {
288+
wg.Add(1)
289+
go func(i int) {
290+
defer wg.Done()
291+
292+
// Each goroutine attempts to update multiple shared keys in a transaction
293+
counterValue := strconv.Itoa(i)
294+
295+
_, err := client.TransactWriteItemsWithContext(context.Background(), &dynamodb.TransactWriteItemsInput{
296+
TransactItems: []*dynamodb.TransactWriteItem{
297+
{
298+
Put: &dynamodb.Put{
299+
TableName: aws.String("t"),
300+
Item: map[string]*dynamodb.AttributeValue{
301+
"key": {S: aws.String(baseKeys[0])},
302+
"value": {S: aws.String("updated-by-" + counterValue)},
303+
"counter": {N: aws.String(counterValue)},
304+
},
305+
},
306+
},
307+
{
308+
Put: &dynamodb.Put{
309+
TableName: aws.String("t"),
310+
Item: map[string]*dynamodb.AttributeValue{
311+
"key": {S: aws.String(baseKeys[1])},
312+
"value": {S: aws.String("updated-by-" + counterValue)},
313+
"counter": {N: aws.String(counterValue)},
314+
},
315+
},
316+
},
317+
{
318+
Put: &dynamodb.Put{
319+
TableName: aws.String("t"),
320+
Item: map[string]*dynamodb.AttributeValue{
321+
"key": {S: aws.String(baseKeys[2])},
322+
"value": {S: aws.String("updated-by-" + counterValue)},
323+
"counter": {N: aws.String(counterValue)},
324+
},
325+
},
326+
},
327+
},
328+
})
329+
assert.NoError(t, err, "Transaction failed for goroutine %d", i)
330+
}(i)
331+
}
332+
333+
wg.Wait()
334+
335+
// Verify that all keys have been updated and have consistent values
336+
// Due to the concurrent nature, we can't predict which goroutine will win,
337+
// but we can verify that each key has valid data
338+
for _, key := range baseKeys {
339+
out, err := client.GetItemWithContext(context.Background(), &dynamodb.GetItemInput{
340+
TableName: aws.String("t"),
341+
Key: map[string]*dynamodb.AttributeValue{
342+
"key": {S: aws.String(key)},
343+
},
344+
})
345+
assert.NoError(t, err, "Get failed for key %s", key)
346+
assert.NotNil(t, out.Item, "Item should exist for key %s", key)
347+
348+
if out.Item != nil && out.Item["value"] != nil && out.Item["counter"] != nil {
349+
value := aws.StringValue(out.Item["value"].S)
350+
counter := aws.StringValue(out.Item["counter"].N)
351+
352+
// Verify that the value and counter are consistent (both from the same goroutine)
353+
assert.Contains(t, value, "updated-by-"+counter, "Value and counter should be consistent for key %s", key)
354+
}
355+
}
356+
}

0 commit comments

Comments
 (0)