Skip to content

Commit f853bea

Browse files
authored
Add configurable path_prefix parameter (#6674)
Introduce an optional `path_prefix` sink parameter to support OpenSearch instances served behind a reverse proxy under a custom subdirectory. Resolves #6654 Signed-off-by: Luis Pigueiras <luis.pigueiras@cern.ch>
1 parent 64999e8 commit f853bea

5 files changed

Lines changed: 50 additions & 0 deletions

File tree

data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/ConnectionConfiguration.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,7 @@ public class ConnectionConfiguration {
115115
private final String awsStsExternalId;
116116
private final Map<String, String> awsStsHeaderOverrides;
117117
private final Optional<String> proxy;
118+
private final String pathPrefix;
118119
private final boolean serverless;
119120
private final String serverlessNetworkPolicyName;
120121
private final String serverlessCollectionName;
@@ -160,6 +161,10 @@ Optional<String> getProxy() {
160161
return proxy;
161162
}
162163

164+
String getPathPrefix() {
165+
return pathPrefix;
166+
}
167+
163168
Integer getSocketTimeout() {
164169
return socketTimeout;
165170
}
@@ -219,6 +224,7 @@ private ConnectionConfiguration(final Builder builder) {
219224
this.awsStsExternalId = builder.awsStsExternalId;
220225
this.awsStsHeaderOverrides = builder.awsStsHeaderOverrides;
221226
this.proxy = builder.proxy;
227+
this.pathPrefix = builder.pathPrefix;
222228
this.serverless = builder.serverless;
223229
this.serverlessNetworkPolicyName = builder.serverlessNetworkPolicyName;
224230
this.serverlessCollectionName = builder.serverlessCollectionName;
@@ -312,6 +318,11 @@ public static ConnectionConfiguration readConnectionConfiguration(final OpenSear
312318
builder = builder.withProxy(proxy);
313319
}
314320

321+
final String pathPrefix = openSearchSinkConfig.getPathPrefix();
322+
if (pathPrefix != null) {
323+
builder = builder.withPathPrefix(pathPrefix);
324+
}
325+
315326
final boolean requestCompressionEnabled = openSearchSinkConfig.getEnableRequestCompression();
316327
builder = builder.withRequestCompressionEnabled(requestCompressionEnabled);
317328

@@ -327,6 +338,9 @@ public RestHighLevelClient createClient(AwsCredentialsSupplier awsCredentialsSup
327338
i++;
328339
}
329340
final RestClientBuilder restClientBuilder = RestClient.builder(httpHosts);
341+
if (pathPrefix != null) {
342+
restClientBuilder.setPathPrefix(pathPrefix);
343+
}
330344
/*
331345
* Given that this is a patch release, we will support only the IAM based access policy AES domains.
332346
* We will not support FGAC and Custom endpoint domains. This will be followed in the next version.
@@ -654,6 +668,7 @@ public static class Builder {
654668
private String awsStsExternalId;
655669
private Map<String, String> awsStsHeaderOverrides;
656670
private Optional<String> proxy = Optional.empty();
671+
private String pathPrefix;
657672
private String pipelineName;
658673
private boolean serverless;
659674
private String serverlessNetworkPolicyName;
@@ -791,6 +806,11 @@ public Builder withProxy(final String proxy) {
791806
return this;
792807
}
793808

809+
public Builder withPathPrefix(final String pathPrefix) {
810+
this.pathPrefix = pathPrefix;
811+
return this;
812+
}
813+
794814
public Builder withServerless(boolean serverless) {
795815
this.serverless = serverless;
796816
return this;

data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/configuration/OpenSearchSinkConfig.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,10 @@ public boolean isAuthConfigValid() {
8181
@JsonProperty("proxy")
8282
private String proxy = null;
8383

84+
@Getter
85+
@JsonProperty("path_prefix")
86+
private String pathPrefix = null;
87+
8488
@Getter
8589
@JsonProperty("distribution_version")
8690
private String distributionVersion = DistributionVersion.DEFAULT.getVersion();

data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/ConnectionConfigurationTests.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@ void testReadConnectionConfigurationDefault() throws JsonProcessingException {
108108
assertNull(connectionConfiguration.getCertPath());
109109
assertNull(connectionConfiguration.getConnectTimeout());
110110
assertNull(connectionConfiguration.getSocketTimeout());
111+
assertNull(connectionConfiguration.getPathPrefix());
111112
assertTrue(connectionConfiguration.isRequestCompressionEnabled());
112113
}
113114

@@ -143,6 +144,17 @@ void testCreateClientDefault() throws IOException {
143144
client.close();
144145
}
145146

147+
@Test
148+
void testReadConnectionConfigurationWithPathPrefix() throws JsonProcessingException {
149+
final Map<String, Object> configMetadata = generateConfigurationMetadata(
150+
TEST_HOSTS, null, null, null, null, false, null, null, null, false);
151+
configMetadata.put("path_prefix", "/os");
152+
final OpenSearchSinkConfig openSearchSinkConfig = getOpenSearchSinkConfigByConfigMetadata(configMetadata);
153+
final ConnectionConfiguration connectionConfiguration =
154+
ConnectionConfiguration.readConnectionConfiguration(openSearchSinkConfig);
155+
assertEquals("/os", connectionConfiguration.getPathPrefix());
156+
}
157+
146158
@Test
147159
void testCreateOpenSearchClientDefault() throws IOException {
148160
final OpenSearchSinkConfig openSearchSinkConfig = generateOpenSearchSinkConfig(

data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkConfigurationTests.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ public class OpenSearchSinkConfigurationTests {
3131
private static final String INVALID_ACTIONS_WITH_EXPRESSION_CONFIG = "invalid-actions-with-expression";
3232
private static final String CREATE_ACTION_CONFIG = "create-action";
3333
private static final String CREATE_ACTIONS_WITH_EXPRESSION_CONFIG = "create-actions-with-expression";
34+
private static final String VALID_SINK_WITH_PATH_PREFIX_CONFIG = "valid-sink-with-path-prefix";
3435
private ExpressionEvaluator expressionEvaluator;
3536

3637
ObjectMapper objectMapper;
@@ -94,6 +95,13 @@ public void testReadESConfigWithBulkActionCreateExpression() throws IOException
9495
assertNotNull(openSearchSinkConfiguration.getRetryConfiguration());
9596
}
9697

98+
@Test
99+
public void testReadOSConfigWithPathPrefix() throws IOException {
100+
final OpenSearchSinkConfig openSearchSinkConfig = generateOpenSearchSinkConfig(VALID_SINK_WITH_PATH_PREFIX_CONFIG);
101+
102+
assertEquals("/os", openSearchSinkConfig.getPathPrefix());
103+
}
104+
97105

98106
private OpenSearchSinkConfig generateOpenSearchSinkConfig(String pipelineName) throws IOException {
99107
final File configurationFile = new File(getClass().getClassLoader().getResource(OPEN_SEARCH_SINK_CONFIGURATIONS).getFile());

data-prepper-plugins/opensearch/src/test/resources/open-search-sink-configurations.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,12 @@ valid-sink:
88
sts_role_arn: "arn:aws:iam::123456789012:role/test-role"
99
region: "us-east-2"
1010
serverless: true
11+
valid-sink-with-path-prefix:
12+
sink:
13+
opensearch:
14+
hosts: [ "http://localhost:9200" ]
15+
index: "no-more-plugin-setting"
16+
path_prefix: "/os"
1117
invalid-action:
1218
sink:
1319
opensearch:

0 commit comments

Comments
 (0)