Skip to content

Commit ed3068b

Browse files
committed
Fix s3 folder scan depth for RDS source when partition prefix is not set
Signed-off-by: Divyansh Bokadia <dbokadia@amazon.com>
1 parent 506a04f commit ed3068b

2 files changed

Lines changed: 30 additions & 1 deletion

File tree

data-prepper-pipeline-parser/src/main/java/org/opensearch/dataprepper/pipeline/parser/transformer/DynamicConfigTransformer.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -427,13 +427,19 @@ public String calculateDepth(String s3Prefix) {
427427
return Integer.toString(getDepth(s3Prefix, 4));
428428
}
429429

430+
protected String getSourceCoordinationIdentifier() {
431+
return System.getenv(SOURCE_COORDINATION_IDENTIFIER_ENVIRONMENT_VARIABLE);
432+
}
433+
430434
/**
431435
* Calculate s3 folder scan depth for RDS source pipeline
432436
* @param s3Prefix: s3 prefix defined in the source configuration
433437
* @return s3 folder scan depth
434438
*/
435439
public String calculateDepthForRdsSource(String s3Prefix) {
436-
return Integer.toString(getDepth(s3Prefix, 3));
440+
String envSourceCoordinationIdentifier = getSourceCoordinationIdentifier();
441+
int baseDepth = envSourceCoordinationIdentifier != null ? 3 : 2;
442+
return Integer.toString(getDepth(s3Prefix, baseDepth));
437443
}
438444

439445
private int getDepth(String s3Prefix, int baseDepth) {

data-prepper-pipeline-parser/src/test/java/org/opensearch/dataprepper/pipeline/parser/transformer/DynamicConfigTransformerTest.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,12 @@
3737
import java.util.stream.Stream;
3838

3939
import static org.assertj.core.api.Assertions.assertThat;
40+
import static org.hamcrest.Matchers.equalTo;
41+
import static org.hamcrest.MatcherAssert.assertThat;
4042
import static org.junit.jupiter.api.Assertions.assertThrows;
43+
import static org.mockito.Mockito.doReturn;
4144
import static org.mockito.Mockito.mock;
45+
import static org.mockito.Mockito.spy;
4246
import static org.mockito.Mockito.when;
4347

4448
class DynamicConfigTransformerTest {
@@ -456,4 +460,23 @@ void test_overlay_directive_overrides_existing_fields() throws Exception {
456460
assertThat(resultOs.get("script").get("source").asText()).isEqualTo("ctx._source.merge(params.doc)");
457461
assertThat(resultOs.get("script").has("custom_field")).isFalse();
458462
}
463+
464+
@Test
465+
void test_calculateDepthForRdsSource_without_source_coordination_identifier() {
466+
String mockPrefix = "my-bucket/path";
467+
DynamicConfigTransformer transformer = spy(new DynamicConfigTransformer(mock(RuleEvaluator.class)));
468+
doReturn(null).when(transformer).getSourceCoordinationIdentifier();
469+
String result = transformer.calculateDepthForRdsSource(mockPrefix);
470+
assertThat(result, equalTo("4"));
471+
}
472+
473+
@Test
474+
void test_calculateDepthForRdsSource_with_source_coordination_identifier() {
475+
String mockPrefix = "my-bucket/path";
476+
DynamicConfigTransformer transformer = spy(new DynamicConfigTransformer(mock(RuleEvaluator.class)));
477+
doReturn("testValue").when(transformer).getSourceCoordinationIdentifier();
478+
String result = transformer.calculateDepthForRdsSource(mockPrefix);
479+
assertThat(result, equalTo("5"));
480+
}
481+
459482
}

0 commit comments

Comments
 (0)