Skip to content

Commit 9ab26a9

Browse files
committed
Fix s3 folder scan depth for RDS source when partition prefix is not set
Signed-off-by: Divyansh Bokadia <dbokadia@amazon.com>
1 parent 506a04f commit 9ab26a9

2 files changed

Lines changed: 29 additions & 1 deletion

File tree

data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/transformer/DynamicConfigTransformer.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -427,13 +427,19 @@ public String calculateDepth(String s3Prefix) {
427427
return Integer.toString(getDepth(s3Prefix, 4));
428428
}
429429

430+
public String getSourceCoordinationIdentifier() {
431+
return System.getenv(SOURCE_COORDINATION_IDENTIFIER_ENVIRONMENT_VARIABLE);
432+
}
433+
430434
/**
431435
* Calculate s3 folder scan depth for RDS source pipeline
432436
* @param s3Prefix: s3 prefix defined in the source configuration
433437
* @return s3 folder scan depth
434438
*/
435439
public String calculateDepthForRdsSource(String s3Prefix) {
436-
return Integer.toString(getDepth(s3Prefix, 3));
440+
String envSourceCoordinationIdentifier = getSourceCoordinationIdentifier();
441+
int baseDepth = envSourceCoordinationIdentifier != null ? 3 : 2;
442+
return Integer.toString(getDepth(s3Prefix, baseDepth));
437443
}
438444

439445
private int getDepth(String s3Prefix, int baseDepth) {

data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/transformer/DynamicConfigTransformerTest.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,11 @@
3737
import java.util.stream.Stream;
3838

3939
import static org.assertj.core.api.Assertions.assertThat;
40+
import static org.junit.jupiter.api.Assertions.assertEquals;
4041
import static org.junit.jupiter.api.Assertions.assertThrows;
42+
import static org.mockito.Mockito.doReturn;
4143
import static org.mockito.Mockito.mock;
44+
import static org.mockito.Mockito.spy;
4245
import static org.mockito.Mockito.when;
4346

4447
class DynamicConfigTransformerTest {
@@ -456,4 +459,23 @@ void test_overlay_directive_overrides_existing_fields() throws Exception {
456459
assertThat(resultOs.get("script").get("source").asText()).isEqualTo("ctx._source.merge(params.doc)");
457460
assertThat(resultOs.get("script").has("custom_field")).isFalse();
458461
}
462+
463+
@Test
464+
void test_calculateDepthForRdsSource_without_source_coordination_identifier() {
465+
String mockPrefix = "my-bucket/path";
466+
DynamicConfigTransformer transformer = spy(new DynamicConfigTransformer(mock(RuleEvaluator.class)));
467+
doReturn(null).when(transformer).getSourceCoordinationIdentifier();
468+
String result = transformer.calculateDepthForRdsSource(mockPrefix);
469+
assertEquals("4", result);
470+
}
471+
472+
@Test
473+
void test_calculateDepthForRdsSource_with_source_coordination_identifier() {
474+
String mockPrefix = "my-bucket/path";
475+
DynamicConfigTransformer transformer = spy(new DynamicConfigTransformer(mock(RuleEvaluator.class)));
476+
doReturn("testValue").when(transformer).getSourceCoordinationIdentifier();
477+
String result = transformer.calculateDepthForRdsSource(mockPrefix);
478+
assertEquals("5", result);
479+
}
480+
459481
}

0 commit comments

Comments
 (0)