Skip to content

Commit 32673a9

Browse files
committed
add unit tests for s3 enrich processor
Signed-off-by: Xun Zhang <xunzh@amazon.com>
1 parent a781b13 commit 32673a9

8 files changed

Lines changed: 1160 additions & 1 deletion

File tree

data-prepper-plugins/s3-enrich-processor/build.gradle

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,11 @@
11
/*
22
* Copyright OpenSearch Contributors
33
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*
49
*/
510

611
dependencies {

data-prepper-plugins/s3-enrich-processor/src/main/java/org/opensearch/dataprepper/plugins/s3_enrich/processor/s3source/S3ObjectWorker.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ private void doProcessObject(final S3ObjectReference s3ObjectReference) throws E
9292
try {
9393
Event event = record.getData();
9494

95-
String correlationValue = event.getJsonNode().get(s3EnricherProcessorConfig.getCorrelationKey()).asText();
95+
String correlationValue = event.getJsonNode().get(s3EnricherProcessorConfig.getCorrelationKeys().get(0)).asText();
9696

9797
event.getMetadata().setExternalOriginationTime(originationTime);
9898
event.getEventHandle().setExternalOriginationTime(originationTime);

data-prepper-plugins/s3-enrich-processor/src/test/java/org/opensearch/dataprepper/plugins/s3_enrich/processor/S3EnrichProcessorTest.java

Lines changed: 421 additions & 0 deletions
Large diffs are not rendered by default.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*
9+
*/
10+
11+
package org.opensearch.dataprepper.plugins.s3_enrich.processor.cache;
12+
13+
import com.github.benmanes.caffeine.cache.Cache;
14+
import org.junit.jupiter.api.BeforeEach;
15+
import org.junit.jupiter.api.Test;
16+
import org.junit.jupiter.api.extension.ExtendWith;
17+
import org.mockito.Mock;
18+
import org.mockito.junit.jupiter.MockitoExtension;
19+
import org.mockito.junit.jupiter.MockitoSettings;
20+
import org.mockito.quality.Strictness;
21+
import org.opensearch.dataprepper.model.event.Event;
22+
import org.opensearch.dataprepper.plugins.s3_enrich.processor.S3EnrichProcessorConfig;
23+
24+
import java.time.Duration;
25+
26+
import static org.hamcrest.CoreMatchers.notNullValue;
27+
import static org.hamcrest.MatcherAssert.assertThat;
28+
import static org.mockito.Mockito.when;
29+
30+
@ExtendWith(MockitoExtension.class)
31+
@MockitoSettings(strictness = Strictness.LENIENT)
32+
class CacheFactoryTest {
33+
34+
@Mock
35+
private S3EnrichProcessorConfig config;
36+
37+
@BeforeEach
38+
void setUp() {
39+
when(config.getCacheTtl()).thenReturn(Duration.ofMinutes(10));
40+
when(config.getCacheSizeLimit()).thenReturn(1000);
41+
}
42+
43+
private CacheFactory createObjectUnderTest() {
44+
return new CacheFactory(config);
45+
}
46+
47+
@Test
48+
void constructor_builds_s3Cache_on_initialization() {
49+
final CacheFactory objectUnderTest = createObjectUnderTest();
50+
51+
assertThat(objectUnderTest.getS3Cache(), notNullValue());
52+
}
53+
54+
@Test
55+
void getS3Cache_returns_same_instance_across_calls() {
56+
final CacheFactory objectUnderTest = createObjectUnderTest();
57+
58+
final Cache<String, Cache<String, Event>> first = objectUnderTest.getS3Cache();
59+
final Cache<String, Cache<String, Event>> second = objectUnderTest.getS3Cache();
60+
61+
assertThat(first, notNullValue());
62+
assertThat(first == second, org.hamcrest.CoreMatchers.equalTo(true));
63+
}
64+
65+
@Test
66+
void createEventsCache_returns_new_cache_per_call() {
67+
final CacheFactory objectUnderTest = createObjectUnderTest();
68+
69+
final Cache<String, Event> first = objectUnderTest.createEventsCache("s3://bucket/key1");
70+
final Cache<String, Event> second = objectUnderTest.createEventsCache("s3://bucket/key2");
71+
72+
assertThat(first, notNullValue());
73+
assertThat(second, notNullValue());
74+
assertThat(first != second, org.hamcrest.CoreMatchers.equalTo(true));
75+
}
76+
77+
@Test
78+
void createEventsCache_accepts_configured_max_size() {
79+
when(config.getCacheSizeLimit()).thenReturn(500);
80+
final CacheFactory objectUnderTest = createObjectUnderTest();
81+
82+
final Cache<String, Event> cache = objectUnderTest.createEventsCache("s3://bucket/key");
83+
84+
assertThat(cache, notNullValue());
85+
}
86+
87+
@Test
88+
void createEventsCache_with_short_ttl_still_creates_valid_cache() {
89+
when(config.getCacheTtl()).thenReturn(Duration.ofSeconds(1));
90+
final CacheFactory objectUnderTest = createObjectUnderTest();
91+
92+
final Cache<String, Event> cache = objectUnderTest.createEventsCache("s3://bucket/key");
93+
94+
assertThat(cache, notNullValue());
95+
}
96+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,267 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*
9+
*/
10+
11+
package org.opensearch.dataprepper.plugins.s3_enrich.processor.cache;
12+
13+
import com.github.benmanes.caffeine.cache.Cache;
14+
import com.github.benmanes.caffeine.cache.Caffeine;
15+
import org.junit.jupiter.api.BeforeEach;
16+
import org.junit.jupiter.api.Test;
17+
import org.junit.jupiter.api.extension.ExtendWith;
18+
import org.mockito.Mock;
19+
import org.mockito.junit.jupiter.MockitoExtension;
20+
import org.opensearch.dataprepper.model.event.Event;
21+
22+
import java.time.Duration;
23+
import java.util.UUID;
24+
import java.util.concurrent.atomic.AtomicInteger;
25+
26+
import static org.hamcrest.CoreMatchers.equalTo;
27+
import static org.hamcrest.CoreMatchers.notNullValue;
28+
import static org.hamcrest.CoreMatchers.nullValue;
29+
import static org.hamcrest.MatcherAssert.assertThat;
30+
import static org.mockito.ArgumentMatchers.anyString;
31+
import static org.mockito.Mockito.mock;
32+
import static org.mockito.Mockito.times;
33+
import static org.mockito.Mockito.verify;
34+
import static org.mockito.Mockito.when;
35+
36+
@ExtendWith(MockitoExtension.class)
37+
class S3EnricherCacheServiceTest {
38+
39+
@Mock
40+
private CacheFactory cacheFactory;
41+
42+
private S3EnricherCacheService objectUnderTest;
43+
44+
@BeforeEach
45+
void setUp() {
46+
final Cache<String, Cache<String, Event>> outerCache = Caffeine.newBuilder()
47+
.maximumSize(100)
48+
.expireAfterAccess(Duration.ofMinutes(10))
49+
.build();
50+
when(cacheFactory.getS3Cache()).thenReturn(outerCache);
51+
objectUnderTest = new S3EnricherCacheService(cacheFactory);
52+
}
53+
54+
@Test
55+
void getOrCreateRecordCache_creates_new_cache_for_unknown_s3Url() {
56+
final String s3Url = "s3://bucket/key";
57+
final Cache<String, Event> newCache = Caffeine.newBuilder().maximumSize(100).build();
58+
when(cacheFactory.createEventsCache(s3Url)).thenReturn(newCache);
59+
60+
final Cache<String, Event> result = objectUnderTest.getOrCreateRecordCache(s3Url);
61+
62+
assertThat(result, notNullValue());
63+
verify(cacheFactory).createEventsCache(s3Url);
64+
}
65+
66+
@Test
67+
void getOrCreateRecordCache_returns_existing_cache_for_known_s3Url() {
68+
final String s3Url = "s3://bucket/key";
69+
final Cache<String, Event> newCache = Caffeine.newBuilder().maximumSize(100).build();
70+
when(cacheFactory.createEventsCache(s3Url)).thenReturn(newCache);
71+
72+
objectUnderTest.getOrCreateRecordCache(s3Url);
73+
final Cache<String, Event> result = objectUnderTest.getOrCreateRecordCache(s3Url);
74+
75+
assertThat(result, notNullValue());
76+
verify(cacheFactory, times(1)).createEventsCache(s3Url); // only created once
77+
}
78+
79+
@Test
80+
void getRecordCacheIfPresent_returns_null_for_unknown_s3Url() {
81+
final Cache<String, Event> result = objectUnderTest.getRecordCacheIfPresent("s3://bucket/unknown");
82+
83+
assertThat(result, nullValue());
84+
}
85+
86+
@Test
87+
void getRecordCacheIfPresent_returns_cache_for_known_s3Url() {
88+
final String s3Url = "s3://bucket/key";
89+
final Cache<String, Event> newCache = Caffeine.newBuilder().maximumSize(100).build();
90+
when(cacheFactory.createEventsCache(s3Url)).thenReturn(newCache);
91+
objectUnderTest.getOrCreateRecordCache(s3Url);
92+
93+
final Cache<String, Event> result = objectUnderTest.getRecordCacheIfPresent(s3Url);
94+
95+
assertThat(result, notNullValue());
96+
}
97+
98+
@Test
99+
void put_and_get_returns_stored_event() {
100+
final String s3Url = "s3://bucket/key";
101+
final String recordId = UUID.randomUUID().toString();
102+
final Event event = mock(Event.class);
103+
final Cache<String, Event> innerCache = Caffeine.newBuilder().maximumSize(100).build();
104+
when(cacheFactory.createEventsCache(s3Url)).thenReturn(innerCache);
105+
106+
objectUnderTest.put(s3Url, recordId, event);
107+
final Event result = objectUnderTest.get(s3Url, recordId);
108+
109+
assertThat(result, equalTo(event));
110+
}
111+
112+
@Test
113+
void get_returns_null_for_unknown_s3Url() {
114+
final Event result = objectUnderTest.get("s3://bucket/unknown", "record-id");
115+
116+
assertThat(result, nullValue());
117+
}
118+
119+
@Test
120+
void get_returns_null_for_unknown_recordId() {
121+
final String s3Url = "s3://bucket/key";
122+
final Cache<String, Event> innerCache = Caffeine.newBuilder().maximumSize(100).build();
123+
when(cacheFactory.createEventsCache(s3Url)).thenReturn(innerCache);
124+
objectUnderTest.put(s3Url, "record-id-1", mock(Event.class));
125+
126+
final Event result = objectUnderTest.get(s3Url, "record-id-unknown");
127+
128+
assertThat(result, nullValue());
129+
}
130+
131+
@Test
132+
void containsS3Uri_returns_false_for_unknown_uri() {
133+
assertThat(objectUnderTest.containsS3Uri("s3://bucket/unknown"), equalTo(false));
134+
}
135+
136+
@Test
137+
void containsS3Uri_returns_true_after_put() {
138+
final String s3Url = "s3://bucket/key";
139+
final Cache<String, Event> innerCache = Caffeine.newBuilder().maximumSize(100).build();
140+
when(cacheFactory.createEventsCache(s3Url)).thenReturn(innerCache);
141+
objectUnderTest.put(s3Url, "record-id", mock(Event.class));
142+
143+
assertThat(objectUnderTest.containsS3Uri(s3Url), equalTo(true));
144+
}
145+
146+
@Test
147+
void loadIfAbsent_calls_loader_when_not_cached() {
148+
final String s3Url = "s3://bucket/key";
149+
final AtomicInteger callCount = new AtomicInteger(0);
150+
151+
objectUnderTest.loadIfAbsent(s3Url, callCount::incrementAndGet);
152+
153+
assertThat(callCount.get(), equalTo(1));
154+
}
155+
156+
@Test
157+
void loadIfAbsent_does_not_call_loader_when_already_cached() {
158+
final String s3Url = "s3://bucket/key";
159+
final Cache<String, Event> innerCache = Caffeine.newBuilder().maximumSize(100).build();
160+
when(cacheFactory.createEventsCache(s3Url)).thenReturn(innerCache);
161+
objectUnderTest.put(s3Url, "record-id", mock(Event.class));
162+
final AtomicInteger callCount = new AtomicInteger(0);
163+
164+
objectUnderTest.loadIfAbsent(s3Url, callCount::incrementAndGet);
165+
166+
assertThat(callCount.get(), equalTo(0));
167+
}
168+
169+
@Test
170+
void loadIfAbsent_loader_is_called_exactly_once_even_if_called_multiple_times_for_same_uri() {
171+
final String s3Url = "s3://bucket/key";
172+
final Cache<String, Event> innerCache = Caffeine.newBuilder().maximumSize(100).build();
173+
final AtomicInteger callCount = new AtomicInteger(0);
174+
when(cacheFactory.createEventsCache(anyString())).thenReturn(innerCache);
175+
176+
// First call - not cached, so loader runs
177+
objectUnderTest.loadIfAbsent(s3Url, () -> {
178+
callCount.incrementAndGet();
179+
objectUnderTest.put(s3Url, "record-id", mock(Event.class)); // populate cache in loader
180+
});
181+
182+
// Second call - now cached, loader should NOT run
183+
objectUnderTest.loadIfAbsent(s3Url, callCount::incrementAndGet);
184+
185+
assertThat(callCount.get(), equalTo(1));
186+
}
187+
188+
@Test
189+
void getOrLoadRecordCache_calls_loader_when_not_cached() {
190+
final String s3Url = "s3://bucket/key";
191+
final Cache<String, Event> innerCache = Caffeine.newBuilder().maximumSize(100).build();
192+
193+
final Cache<String, Event> result = objectUnderTest.getOrLoadRecordCache(s3Url, () -> innerCache);
194+
195+
assertThat(result, equalTo(innerCache));
196+
}
197+
198+
@Test
199+
void getOrLoadRecordCache_returns_existing_cache_when_already_cached() {
200+
final String s3Url = "s3://bucket/key";
201+
final Cache<String, Event> firstCache = Caffeine.newBuilder().maximumSize(100).build();
202+
final Cache<String, Event> secondCache = Caffeine.newBuilder().maximumSize(100).build();
203+
204+
objectUnderTest.getOrLoadRecordCache(s3Url, () -> firstCache);
205+
final Cache<String, Event> result = objectUnderTest.getOrLoadRecordCache(s3Url, () -> secondCache);
206+
207+
assertThat(result, equalTo(firstCache));
208+
}
209+
210+
@Test
211+
void getRecordCount_returns_zero_for_unknown_s3Url() {
212+
assertThat(objectUnderTest.getRecordCount("s3://bucket/unknown"), equalTo(0L));
213+
}
214+
215+
@Test
216+
void getRecordCount_returns_count_of_stored_records() {
217+
final String s3Url = "s3://bucket/key";
218+
final Cache<String, Event> innerCache = Caffeine.newBuilder().maximumSize(100).build();
219+
when(cacheFactory.createEventsCache(s3Url)).thenReturn(innerCache);
220+
objectUnderTest.put(s3Url, "record-1", mock(Event.class));
221+
objectUnderTest.put(s3Url, "record-2", mock(Event.class));
222+
223+
assertThat(objectUnderTest.getRecordCount(s3Url), equalTo(2L));
224+
}
225+
226+
@Test
227+
void clearAll_removes_all_cached_entries() {
228+
final String s3Url1 = "s3://bucket/key1";
229+
final String s3Url2 = "s3://bucket/key2";
230+
final Cache<String, Event> innerCache1 = Caffeine.newBuilder().maximumSize(100).build();
231+
final Cache<String, Event> innerCache2 = Caffeine.newBuilder().maximumSize(100).build();
232+
when(cacheFactory.createEventsCache(s3Url1)).thenReturn(innerCache1);
233+
when(cacheFactory.createEventsCache(s3Url2)).thenReturn(innerCache2);
234+
235+
objectUnderTest.put(s3Url1, "record-1", mock(Event.class));
236+
objectUnderTest.put(s3Url2, "record-2", mock(Event.class));
237+
238+
objectUnderTest.clearAll();
239+
240+
assertThat(objectUnderTest.containsS3Uri(s3Url1), equalTo(false));
241+
assertThat(objectUnderTest.containsS3Uri(s3Url2), equalTo(false));
242+
}
243+
244+
@Test
245+
void clearS3Uri_removes_only_specified_s3Url() {
246+
final String s3Url1 = "s3://bucket/key1";
247+
final String s3Url2 = "s3://bucket/key2";
248+
final Cache<String, Event> innerCache1 = Caffeine.newBuilder().maximumSize(100).build();
249+
final Cache<String, Event> innerCache2 = Caffeine.newBuilder().maximumSize(100).build();
250+
when(cacheFactory.createEventsCache(s3Url1)).thenReturn(innerCache1);
251+
when(cacheFactory.createEventsCache(s3Url2)).thenReturn(innerCache2);
252+
253+
objectUnderTest.put(s3Url1, "record-1", mock(Event.class));
254+
objectUnderTest.put(s3Url2, "record-2", mock(Event.class));
255+
256+
objectUnderTest.clearS3Uri(s3Url1);
257+
258+
assertThat(objectUnderTest.containsS3Uri(s3Url1), equalTo(false));
259+
assertThat(objectUnderTest.containsS3Uri(s3Url2), equalTo(true));
260+
}
261+
262+
@Test
263+
void clearS3Uri_does_not_throw_for_unknown_uri() {
264+
objectUnderTest.clearS3Uri("s3://bucket/nonexistent");
265+
// no exception expected
266+
}
267+
}

0 commit comments

Comments
 (0)