|
24 | 24 | import org.opensearch.dataprepper.model.source.coordinator.SourcePartitionStatus; |
25 | 25 | import org.opensearch.dataprepper.model.source.coordinator.SourcePartitionStoreItem; |
26 | 26 | import org.opensearch.dataprepper.model.source.coordinator.exceptions.PartitionUpdateException; |
| 27 | +import software.amazon.awssdk.core.pagination.sync.SdkIterable; |
27 | 28 | import software.amazon.awssdk.enhanced.dynamodb.DynamoDbEnhancedClient; |
| 29 | +import software.amazon.awssdk.enhanced.dynamodb.DynamoDbIndex; |
28 | 30 | import software.amazon.awssdk.enhanced.dynamodb.DynamoDbTable; |
29 | 31 | import software.amazon.awssdk.enhanced.dynamodb.Key; |
30 | 32 | import software.amazon.awssdk.enhanced.dynamodb.TableSchema; |
31 | 33 | import software.amazon.awssdk.enhanced.dynamodb.model.GetItemEnhancedRequest; |
| 34 | +import software.amazon.awssdk.enhanced.dynamodb.model.Page; |
| 35 | +import software.amazon.awssdk.enhanced.dynamodb.model.QueryConditional; |
| 36 | +import software.amazon.awssdk.enhanced.dynamodb.model.QueryEnhancedRequest; |
32 | 37 | import software.amazon.awssdk.services.dynamodb.DynamoDbClient; |
33 | 38 |
|
34 | 39 | import java.time.Duration; |
35 | 40 | import java.time.Instant; |
| 41 | +import java.util.List; |
36 | 42 | import java.util.Optional; |
37 | 43 | import java.util.Random; |
38 | 44 | import java.util.UUID; |
| 45 | +import java.util.concurrent.TimeUnit; |
| 46 | +import java.util.stream.Collectors; |
| 47 | +import java.util.stream.IntStream; |
| 48 | +import java.util.stream.Stream; |
39 | 49 |
|
| 50 | +import static org.awaitility.Awaitility.await; |
40 | 51 | import static org.hamcrest.MatcherAssert.assertThat; |
41 | 52 | import static org.hamcrest.Matchers.equalTo; |
42 | 53 | import static org.hamcrest.Matchers.greaterThanOrEqualTo; |
|
48 | 59 | import static org.hamcrest.Matchers.sameInstance; |
49 | 60 | import static org.junit.jupiter.api.Assertions.assertThrows; |
50 | 61 | import static org.mockito.Mockito.when; |
| 62 | +import static org.opensearch.dataprepper.plugins.sourcecoordinator.dynamodb.DynamoDbClientWrapper.SOURCE_STATUS_COMBINATION_KEY_GLOBAL_SECONDARY_INDEX; |
| 63 | +import static org.opensearch.dataprepper.plugins.sourcecoordinator.dynamodb.DynamoDbSourceCoordinationStore.SOURCE_STATUS_COMBINATION_KEY_FORMAT; |
51 | 64 |
|
52 | 65 | @ExtendWith(MockitoExtension.class) |
53 | 66 | class DynamoDbSourceCoordinationStoreIT { |
@@ -91,8 +104,8 @@ void setUp() { |
91 | 104 | when(dynamoStoreSettings.getRegion()).thenReturn(region); |
92 | 105 | when(dynamoStoreSettings.getStsRoleArn()).thenReturn(stsRoleArn); |
93 | 106 | when(dynamoStoreSettings.getStsExternalId()).thenReturn(stsExternalId); |
94 | | - when(dynamoStoreSettings.getProvisionedReadCapacityUnits()).thenReturn(1L); |
95 | | - when(dynamoStoreSettings.getProvisionedWriteCapacityUnits()).thenReturn(1L); |
| 107 | + when(dynamoStoreSettings.getProvisionedReadCapacityUnits()).thenReturn(10L); |
| 108 | + when(dynamoStoreSettings.getProvisionedWriteCapacityUnits()).thenReturn(10L); |
96 | 109 |
|
97 | 110 | when(dynamoDbClientFactory.provideDynamoDbClient(region, stsRoleArn, stsExternalId)).thenReturn(dynamoDbClient); |
98 | 111 |
|
@@ -250,36 +263,56 @@ void tryCreatePartitionItem_creates_an_item() { |
250 | 263 | } |
251 | 264 |
|
252 | 265 | @Test |
253 | | - void tryAcquireAvailablePartition_gets_first_unassigned_partition() { |
| 266 | + void tryAcquireAvailablePartition_gets_first_unassigned_partition() throws InterruptedException { |
254 | 267 | final DynamoDbSourceCoordinationStore objectUnderTest = createObjectUnderTest(); |
255 | 268 | final String partitionProgressState = UUID.randomUUID().toString(); |
256 | 269 |
|
257 | | - final String unassignedPartitionKey1 = UUID.randomUUID().toString(); |
258 | | - final String unassignedPartitionKey2 = UUID.randomUUID().toString(); |
259 | | - final String unassignedPartitionKey3 = UUID.randomUUID().toString(); |
| 270 | + final List<String> partitionKeys = IntStream.rangeClosed(1, 3) |
| 271 | + .mapToObj(i -> UUID.randomUUID() + "_" + i) |
| 272 | + .collect(Collectors.toList()); |
260 | 273 |
|
261 | | - objectUnderTest.tryCreatePartitionItem(sourceIdentifier, |
262 | | - unassignedPartitionKey1, SourcePartitionStatus.UNASSIGNED, 1L, partitionProgressState, false); |
263 | | - objectUnderTest.tryCreatePartitionItem(sourceIdentifier, |
264 | | - unassignedPartitionKey2, SourcePartitionStatus.UNASSIGNED, 1L, partitionProgressState, false); |
265 | | - objectUnderTest.tryCreatePartitionItem(sourceIdentifier, |
266 | | - unassignedPartitionKey3, SourcePartitionStatus.UNASSIGNED, 1L, partitionProgressState, false); |
| 274 | + for (final String partitionKey : partitionKeys) { |
| 275 | + final boolean createSuccess = objectUnderTest.tryCreatePartitionItem(sourceIdentifier, |
| 276 | + partitionKey, SourcePartitionStatus.UNASSIGNED, 1L, partitionProgressState, false); |
| 277 | + assertThat(createSuccess, equalTo(true)); |
| 278 | + Thread.sleep(150); |
| 279 | + } |
267 | 280 |
|
268 | | - final Optional<SourcePartitionStoreItem> maybeAcquired = |
269 | | - objectUnderTest.tryAcquireAvailablePartition(sourceIdentifier, ownerId, Duration.ofSeconds(20)); |
| 281 | + await().atMost(10, TimeUnit.SECONDS).until(() -> { |
| 282 | + final String primaryKey = String.format(SOURCE_STATUS_COMBINATION_KEY_FORMAT, sourceIdentifier, SourcePartitionStatus.UNASSIGNED); |
| 283 | + final Stream<DynamoDbSourcePartitionItem> items = querySourceStatusIndex(primaryKey); |
| 284 | + |
| 285 | + return items.count() == partitionKeys.size(); |
| 286 | + }); |
| 287 | + |
| 288 | + final Optional<SourcePartitionStoreItem> maybeAcquired = objectUnderTest.tryAcquireAvailablePartition(sourceIdentifier, ownerId, Duration.ofSeconds(20)); |
270 | 289 |
|
271 | 290 | assertThat(maybeAcquired, notNullValue()); |
272 | 291 | assertThat(maybeAcquired.isPresent(), equalTo(true)); |
273 | | - |
274 | 292 | final SourcePartitionStoreItem acquiredItem = maybeAcquired.get(); |
275 | 293 |
|
276 | 294 | assertThat(acquiredItem, notNullValue()); |
| 295 | + final String unassignedPartitionKey1 = partitionKeys.get(0); |
| 296 | + |
277 | 297 | assertThat(acquiredItem.getSourceIdentifier(), equalTo(sourceIdentifier)); |
278 | 298 | assertThat(acquiredItem.getSourcePartitionKey(), equalTo(unassignedPartitionKey1)); |
279 | 299 | assertThat(acquiredItem.getSourcePartitionStatus(), equalTo(SourcePartitionStatus.ASSIGNED)); |
280 | 300 | assertThat(acquiredItem.getPartitionOwner(), equalTo(ownerId)); |
281 | 301 | } |
282 | 302 |
|
| 303 | + private Stream<DynamoDbSourcePartitionItem> querySourceStatusIndex(final String partitionKey) { |
| 304 | + final DynamoDbIndex<DynamoDbSourcePartitionItem> sourceStatusIndex = table.index(SOURCE_STATUS_COMBINATION_KEY_GLOBAL_SECONDARY_INDEX); |
| 305 | + final QueryEnhancedRequest queryEnhancedRequest = QueryEnhancedRequest.builder() |
| 306 | + .limit(1) |
| 307 | + .queryConditional(QueryConditional.keyEqualTo(Key.builder().partitionValue(partitionKey).build())) |
| 308 | + .build(); |
| 309 | + |
| 310 | + final SdkIterable<Page<DynamoDbSourcePartitionItem>> pages = sourceStatusIndex.query(queryEnhancedRequest); |
| 311 | + |
| 312 | + return pages.stream() |
| 313 | + .flatMap(page -> page.items().stream()); |
| 314 | + } |
| 315 | + |
283 | 316 | private DynamoDbSourcePartitionItem putDynamoDbSourcePartitionItem(final SourcePartitionStatus sourcePartitionStatus) { |
284 | 317 | final DynamoDbSourcePartitionItem putItem = createUnsavedPartitionItem(sourcePartitionStatus); |
285 | 318 | table.putItem(putItem); |
|
0 commit comments