Skip to content

Commit 062ae95

Browse files
authored
Use an ObjectInputFilter to serialize allow deserialization of only certain objects in peer-to-peer connections. Additionally, it refactors some application configurations to improve integration testing. Fixes opensearch-project#2310. (opensearch-project#2311)
Signed-off-by: David Venable <dlv@amazon.com>
1 parent 15d590d commit 062ae95

10 files changed

Lines changed: 574 additions & 53 deletions

File tree

data-prepper-core/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ dependencies {
3838
exclude group: 'commons-logging', module: 'commons-logging'
3939
}
4040
implementation 'software.amazon.cloudwatchlogs:aws-embedded-metrics:2.0.0-beta-1'
41+
testImplementation 'org.apache.logging.log4j:log4j-jpl:2.17.0'
4142
testImplementation 'org.springframework:spring-test:5.3.25'
4243
implementation libs.armeria.core
4344
implementation libs.armeria.grpc

data-prepper-core/src/main/java/org/opensearch/dataprepper/peerforwarder/PeerForwarderAppConfig.java

Lines changed: 0 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -5,15 +5,10 @@
55

66
package org.opensearch.dataprepper.peerforwarder;
77

8-
import com.fasterxml.jackson.databind.ObjectMapper;
9-
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
10-
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
118
import org.opensearch.dataprepper.metrics.PluginMetrics;
129
import org.opensearch.dataprepper.parser.model.DataPrepperConfiguration;
1310
import org.opensearch.dataprepper.peerforwarder.certificate.CertificateProviderFactory;
1411
import org.opensearch.dataprepper.peerforwarder.client.PeerForwarderClient;
15-
import org.opensearch.dataprepper.peerforwarder.codec.JacksonPeerForwarderCodec;
16-
import org.opensearch.dataprepper.peerforwarder.codec.JavaPeerForwarderCodec;
1712
import org.opensearch.dataprepper.peerforwarder.codec.PeerForwarderCodec;
1813
import org.opensearch.dataprepper.peerforwarder.server.PeerForwarderHttpServerProvider;
1914
import org.opensearch.dataprepper.peerforwarder.server.PeerForwarderHttpService;
@@ -24,7 +19,6 @@
2419
import org.springframework.beans.factory.annotation.Qualifier;
2520
import org.springframework.context.annotation.Bean;
2621
import org.springframework.context.annotation.Configuration;
27-
import org.yaml.snakeyaml.LoaderOptions;
2822

2923
@Configuration
3024
class PeerForwarderAppConfig {
@@ -36,17 +30,6 @@ public PluginMetrics pluginMetrics() {
3630
return PluginMetrics.fromNames(COMPONENT_ID, COMPONENT_SCOPE);
3731
}
3832

39-
@Bean(name = "peerForwarderObjectMapper")
40-
public ObjectMapper objectMapper() {
41-
final JavaTimeModule javaTimeModule = new JavaTimeModule();
42-
final LoaderOptions loaderOptions = new LoaderOptions();
43-
loaderOptions.setCodePointLimit(10 * 1024 * 1024); // 10MB
44-
final YAMLFactory yamlFactory = YAMLFactory.builder()
45-
.loaderOptions(loaderOptions)
46-
.build();
47-
return new ObjectMapper(yamlFactory).registerModule(javaTimeModule);
48-
}
49-
5033
@Bean
5134
public PeerForwarderConfiguration peerForwarderConfiguration(
5235
@Autowired(required = false) final DataPrepperConfiguration dataPrepperConfiguration) {
@@ -100,14 +83,6 @@ public ResponseHandler responseHandler(@Qualifier("peerForwarderMetrics") final
10083
return new ResponseHandler(pluginMetrics);
10184
}
10285

103-
@Bean
104-
public PeerForwarderCodec peerForwarderCodec(
105-
final PeerForwarderConfiguration peerForwarderConfiguration,
106-
@Qualifier("peerForwarderObjectMapper") final ObjectMapper objectMapper) {
107-
return peerForwarderConfiguration.getBinaryCodec() ?
108-
new JavaPeerForwarderCodec() : new JacksonPeerForwarderCodec(objectMapper);
109-
}
110-
11186
@Bean
11287
public PeerForwarderHttpService peerForwarderHttpService(
11388
final ResponseHandler responseHandler,

data-prepper-core/src/main/java/org/opensearch/dataprepper/peerforwarder/codec/JavaPeerForwarderCodec.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,17 @@
66
import java.io.ByteArrayOutputStream;
77
import java.io.IOException;
88
import java.io.InputStream;
9+
import java.io.ObjectInputFilter;
910
import java.io.ObjectInputStream;
1011
import java.io.ObjectOutputStream;
12+
import java.util.Objects;
1113

1214
public class JavaPeerForwarderCodec implements PeerForwarderCodec {
15+
private final ObjectInputFilter filter;
16+
17+
public JavaPeerForwarderCodec(final ObjectInputFilter filter) {
18+
this.filter = Objects.requireNonNull(filter);
19+
}
1320

1421
@Override
1522
public byte[] serialize(final PeerForwardingEvents events) throws IOException {
@@ -24,6 +31,7 @@ public byte[] serialize(final PeerForwardingEvents events) throws IOException {
2431
public PeerForwardingEvents deserialize(final byte[] bytes) throws IOException, ClassNotFoundException {
2532
try (final InputStream inputStream = new ByteArrayInputStream(bytes);
2633
final ObjectInputStream objectInputStream = new ObjectInputStream(inputStream)) {
34+
objectInputStream.setObjectInputFilter(filter);
2735
return (PeerForwardingEvents) objectInputStream.readObject();
2836
}
2937
}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.dataprepper.peerforwarder.codec;
7+
8+
import org.slf4j.Logger;
9+
import org.slf4j.LoggerFactory;
10+
11+
import java.io.ObjectInputFilter;
12+
import java.util.Objects;
13+
14+
/**
15+
* A decorator for {@link ObjectInputFilter} which logs information when the filter is rejected.
16+
*/
17+
class LoggingObjectInputFilter implements ObjectInputFilter {
18+
private static final Logger LOG = LoggerFactory.getLogger(LoggingObjectInputFilter.class);
19+
private final ObjectInputFilter filter;
20+
21+
public LoggingObjectInputFilter(final ObjectInputFilter filter) {
22+
this.filter = Objects.requireNonNull(filter);
23+
}
24+
25+
@Override
26+
public Status checkInput(final FilterInfo filterInfo) {
27+
final Status status = filter.checkInput(filterInfo);
28+
29+
if(status == Status.REJECTED) {
30+
LOG.warn("Unable to deserialize: {}", filterInfo.serialClass());
31+
}
32+
33+
return status;
34+
}
35+
}
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.dataprepper.peerforwarder.codec;
7+
8+
import com.fasterxml.jackson.databind.ObjectMapper;
9+
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
10+
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
11+
import org.opensearch.dataprepper.peerforwarder.PeerForwarderConfiguration;
12+
import org.springframework.beans.factory.annotation.Qualifier;
13+
import org.springframework.context.annotation.Bean;
14+
import org.springframework.context.annotation.Configuration;
15+
import org.yaml.snakeyaml.LoaderOptions;
16+
17+
import java.io.ObjectInputFilter;
18+
19+
@Configuration
20+
public class PeerForwarderCodecAppConfig {
21+
@Bean
22+
public PeerForwarderCodec peerForwarderCodec(
23+
final PeerForwarderConfiguration peerForwarderConfiguration,
24+
final ObjectInputFilter objectInputFilter,
25+
@Qualifier("peerForwarderObjectMapper") final ObjectMapper objectMapper) {
26+
return peerForwarderConfiguration.getBinaryCodec() ?
27+
new JavaPeerForwarderCodec(objectInputFilter) : new JacksonPeerForwarderCodec(objectMapper);
28+
}
29+
30+
@Bean(name = "peerForwarderObjectMapper")
31+
public ObjectMapper objectMapper() {
32+
final JavaTimeModule javaTimeModule = new JavaTimeModule();
33+
final LoaderOptions loaderOptions = new LoaderOptions();
34+
loaderOptions.setCodePointLimit(10 * 1024 * 1024); // 10MB
35+
final YAMLFactory yamlFactory = YAMLFactory.builder()
36+
.loaderOptions(loaderOptions)
37+
.build();
38+
return new ObjectMapper(yamlFactory).registerModule(javaTimeModule);
39+
}
40+
41+
@Bean
42+
public ObjectInputFilter objectInputFilter() {
43+
final String baseModelPackage = "org.opensearch.dataprepper.model";
44+
45+
final String pattern =
46+
"java.lang.Object;" +
47+
"java.util.*;" +
48+
"java.time.*;" +
49+
"com.fasterxml.jackson.databind.node.*;" +
50+
"org.opensearch.dataprepper.peerforwarder.model.*;" +
51+
baseModelPackage + ".event.*;" +
52+
baseModelPackage + ".trace.*;" +
53+
baseModelPackage + ".log.*;" +
54+
baseModelPackage + ".metric.*;" +
55+
baseModelPackage + ".document.*;" +
56+
"com.google.common.collect.ImmutableMap*;" +
57+
"com.google.common.collect.RegularImmutableMap*;" +
58+
"!*";
59+
60+
final ObjectInputFilter filter = ObjectInputFilter.Config.createFilter(pattern);
61+
62+
return new LoggingObjectInputFilter(filter);
63+
}
64+
}

data-prepper-core/src/test/java/org/opensearch/dataprepper/peerforwarder/PeerForwarder_ClientServerIT.java

Lines changed: 28 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -5,15 +5,7 @@
55

66
package org.opensearch.dataprepper.peerforwarder;
77

8-
import org.junit.jupiter.params.ParameterizedTest;
9-
import org.junit.jupiter.params.provider.ValueSource;
10-
import org.opensearch.dataprepper.metrics.PluginMetrics;
11-
import org.opensearch.dataprepper.model.CheckpointState;
12-
import org.opensearch.dataprepper.model.event.Event;
13-
import org.opensearch.dataprepper.model.event.JacksonEvent;
14-
import org.opensearch.dataprepper.model.record.Record;
158
import com.fasterxml.jackson.databind.ObjectMapper;
16-
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
179
import com.linecorp.armeria.client.UnprocessedRequestException;
1810
import com.linecorp.armeria.common.AggregatedHttpResponse;
1911
import com.linecorp.armeria.common.ClosedSessionException;
@@ -22,8 +14,16 @@
2214
import org.junit.jupiter.api.AfterEach;
2315
import org.junit.jupiter.api.BeforeEach;
2416
import org.junit.jupiter.api.Nested;
17+
import org.junit.jupiter.params.ParameterizedTest;
18+
import org.junit.jupiter.params.provider.ValueSource;
19+
import org.opensearch.dataprepper.metrics.PluginMetrics;
20+
import org.opensearch.dataprepper.model.CheckpointState;
21+
import org.opensearch.dataprepper.model.event.Event;
22+
import org.opensearch.dataprepper.model.event.JacksonEvent;
23+
import org.opensearch.dataprepper.model.record.Record;
2524
import org.opensearch.dataprepper.peerforwarder.certificate.CertificateProviderFactory;
2625
import org.opensearch.dataprepper.peerforwarder.client.PeerForwarderClient;
26+
import org.opensearch.dataprepper.peerforwarder.codec.PeerForwarderCodecAppConfig;
2727
import org.opensearch.dataprepper.peerforwarder.codec.JacksonPeerForwarderCodec;
2828
import org.opensearch.dataprepper.peerforwarder.codec.JavaPeerForwarderCodec;
2929
import org.opensearch.dataprepper.peerforwarder.codec.PeerForwarderCodec;
@@ -33,6 +33,7 @@
3333
import org.opensearch.dataprepper.peerforwarder.server.PeerForwarderServer;
3434
import org.opensearch.dataprepper.peerforwarder.server.RemotePeerForwarderServer;
3535
import org.opensearch.dataprepper.peerforwarder.server.ResponseHandler;
36+
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
3637

3738
import javax.net.ssl.SSLHandshakeException;
3839
import java.util.Collection;
@@ -71,18 +72,16 @@ class PeerForwarder_ClientServerIT {
7172
private ObjectMapper objectMapper;
7273
private JavaPeerForwarderCodec javaPeerForwarderCodec;
7374
private JacksonPeerForwarderCodec jacksonPeerForwarderCodec;
75+
private PeerForwarderConfiguration peerForwarderConfiguration;
7476
private String pipelineName;
7577
private String pluginId;
7678
private List<Record<Event>> outgoingRecords;
7779
private Set<String> expectedMessages;
7880
private PluginMetrics pluginMetrics;
81+
private AnnotationConfigApplicationContext applicationContext;
7982

8083
@BeforeEach
8184
void setUp() {
82-
objectMapper = new ObjectMapper().registerModule(new JavaTimeModule());
83-
javaPeerForwarderCodec = new JavaPeerForwarderCodec();
84-
jacksonPeerForwarderCodec = new JacksonPeerForwarderCodec(objectMapper);
85-
8685
outgoingRecords = IntStream.range(0, 5)
8786
.mapToObj(i -> UUID.randomUUID().toString())
8887
.map(JacksonEvent::fromMessage)
@@ -99,12 +98,20 @@ void setUp() {
9998
.collect(Collectors.toSet());
10099
}
101100

101+
private void setupApplicationContext() {
102+
applicationContext = new AnnotationConfigApplicationContext();
103+
104+
applicationContext.scan(PeerForwarderCodecAppConfig.class.getPackage().getName());
105+
applicationContext.registerBean("peerForwarderConfiguration", PeerForwarderConfiguration.class, () -> peerForwarderConfiguration);
106+
applicationContext.refresh();
107+
}
108+
109+
102110
private PeerForwarderServer createServer(
103111
final PeerForwarderConfiguration peerForwarderConfiguration,
104112
final CertificateProviderFactory certificateProviderFactory,
105113
final PeerForwarderProvider peerForwarderProvider) {
106-
final PeerForwarderCodec peerForwarderCodec = peerForwarderConfiguration.getBinaryCodec()?
107-
javaPeerForwarderCodec : jacksonPeerForwarderCodec;
114+
final PeerForwarderCodec peerForwarderCodec = applicationContext.getBean(PeerForwarderCodec.class);
108115
final PeerForwarderHttpService peerForwarderHttpService = new PeerForwarderHttpService(new ResponseHandler(pluginMetrics), peerForwarderProvider, peerForwarderConfiguration,
109116
peerForwarderCodec, pluginMetrics);
110117
Objects.requireNonNull(peerForwarderConfiguration, "Nested classes must supply peerForwarderConfiguration");
@@ -133,8 +140,7 @@ private PeerForwarderClient createClient(
133140
final PeerClientPool peerClientPool = new PeerClientPool();
134141
final PeerForwarderClientFactory peerForwarderClientFactory = new PeerForwarderClientFactory(peerForwarderConfiguration, peerClientPool, certificateProviderFactory, pluginMetrics);
135142
peerForwarderClientFactory.setPeerClientPool();
136-
final PeerForwarderCodec peerForwarderCodec = peerForwarderConfiguration.getBinaryCodec()?
137-
javaPeerForwarderCodec : jacksonPeerForwarderCodec;
143+
final PeerForwarderCodec peerForwarderCodec = applicationContext.getBean(PeerForwarderCodec.class);
138144
return new PeerForwarderClient(peerForwarderConfiguration, peerForwarderClientFactory, peerForwarderCodec, pluginMetrics);
139145
}
140146

@@ -149,14 +155,14 @@ private Collection<Record<Event>> getServerSideRecords(final PeerForwarderProvid
149155

150156
@Nested
151157
class WithSSL {
152-
153-
private PeerForwarderConfiguration peerForwarderConfiguration;
154158
private PeerForwarderServer server;
155159
private PeerForwarderProvider peerForwarderProvider;
156160

157161
void setUpServer(final boolean binaryCodec) {
158162
peerForwarderConfiguration = createConfiguration(true, ForwardingAuthentication.UNAUTHENTICATED, binaryCodec);
159163

164+
setupApplicationContext();
165+
160166
final CertificateProviderFactory certificateProviderFactory = new CertificateProviderFactory(peerForwarderConfiguration);
161167
peerForwarderProvider = createPeerForwarderProvider(peerForwarderConfiguration, certificateProviderFactory);
162168
peerForwarderProvider.register(pipelineName, pluginId, Collections.singleton(UUID.randomUUID().toString()), PIPELINE_WORKER_THREADS);
@@ -269,14 +275,14 @@ void send_Events_with_fingerprint_verification_to_unknown_server_should_throw(fi
269275

270276
@Nested
271277
class WithoutSSL {
272-
273-
private PeerForwarderConfiguration peerForwarderConfiguration;
274278
private PeerForwarderServer server;
275279
private PeerForwarderProvider peerForwarderProvider;
276280

277281
void setUpServer(final boolean binaryCodec) {
278282
peerForwarderConfiguration = createConfiguration(false, ForwardingAuthentication.UNAUTHENTICATED, binaryCodec);
279283

284+
setupApplicationContext();
285+
280286
final CertificateProviderFactory certificateProviderFactory = new CertificateProviderFactory(peerForwarderConfiguration);
281287
peerForwarderProvider = createPeerForwarderProvider(peerForwarderConfiguration, certificateProviderFactory);
282288
peerForwarderProvider.register(pipelineName, pluginId, Collections.singleton(UUID.randomUUID().toString()), PIPELINE_WORKER_THREADS);
@@ -328,14 +334,14 @@ void send_Events_to_server_when_expecting_SSL_should_throw(final boolean binaryC
328334

329335
@Nested
330336
class WithMutualTls {
331-
332-
private PeerForwarderConfiguration peerForwarderConfiguration;
333337
private PeerForwarderServer server;
334338
private PeerForwarderProvider peerForwarderProvider;
335339

336340
void setUpServer(final boolean binaryCodec) {
337341
peerForwarderConfiguration = createConfiguration(true, ForwardingAuthentication.MUTUAL_TLS, binaryCodec);
338342

343+
setupApplicationContext();
344+
339345
final CertificateProviderFactory certificateProviderFactory = new CertificateProviderFactory(peerForwarderConfiguration);
340346
peerForwarderProvider = createPeerForwarderProvider(peerForwarderConfiguration, certificateProviderFactory);
341347
peerForwarderProvider.register(pipelineName, pluginId, Collections.singleton(UUID.randomUUID().toString()), PIPELINE_WORKER_THREADS);

0 commit comments

Comments
 (0)