Skip to content

Commit ce1d02d

Browse files
author
Karan Bhakta
committed
Add remaining test files and resolve merge conflicts
1 parent f6f5111 commit ce1d02d

5 files changed

Lines changed: 478 additions & 2 deletions

File tree

data-prepper-core/src/main/java/org/opensearch/dataprepper/core/livecapture/LiveCaptureAppConfig.java

Lines changed: 67 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,35 @@ private void initializeLiveCaptureOutputManager(final LiveCaptureConfiguration l
100100
if (eventSink != null) {
101101
LiveCaptureOutputManager.getInstance().initialize(eventSink, entryThreshold, batchSize);
102102
LOG.info("LiveCaptureOutputManager initialized with sink: {}", eventSink.getClass().getSimpleName());
103+
try {
104+
Object sinkConfig = liveCaptureConfig.getLiveCaptureOutputSinkConfig();
105+
if (!(sinkConfig instanceof Map)) {
106+
return;
107+
}
108+
109+
@SuppressWarnings("unchecked")
110+
Map<String, Object> sinkConfigMap = (Map<String, Object>) sinkConfig;
111+
112+
int entryThreshold = getIntValue(sinkConfigMap, "entry_threshold", 1);
113+
int batchSize = getIntValue(sinkConfigMap, "batch_size", 1);
114+
115+
Sink<Record<Event>> eventSink = null;
116+
for (Map.Entry<String, Object> entry : sinkConfigMap.entrySet()) {
117+
if (!"entry_threshold".equals(entry.getKey()) && !"batch_size".equals(entry.getKey())) {
118+
eventSink = createPluginBasedSink(entry.getKey(), entry.getValue());
119+
break;
120+
}
121+
}
122+
123+
if (eventSink != null) {
124+
LiveCaptureOutputManager.getInstance().initialize(eventSink, entryThreshold, batchSize);
125+
LiveCaptureOutputManager.getInstance().enable();
126+
LOG.info("LiveCaptureOutputManager initialized with sink: {}", eventSink.getClass().getSimpleName());
127+
} else {
128+
LOG.warn("No valid sink configuration found in live_capture_out");
129+
}
130+
} catch (Exception e) {
131+
LOG.error("Failed to initialize LiveCaptureOutputManager: {}", e.getMessage(), e);
103132
}
104133
}
105134

@@ -131,6 +160,43 @@ private Sink<Record<Event>> createPluginBasedSink(String sinkType, Object sinkSe
131160
@PreDestroy
132161
public void shutdownLiveCapture() {
133162
LiveCaptureOutputManager.getInstance().shutdown();
163+
try {
164+
if (!(sinkSettings instanceof Map)) {
165+
LOG.error("Sink settings must be a Map, got: {}", sinkSettings != null ? sinkSettings.getClass() : "null");
166+
return null;
167+
}
168+
169+
@SuppressWarnings("unchecked")
170+
Map<String, Object> settingsMap = (Map<String, Object>) sinkSettings;
171+
172+
PluginFactory pluginFactory = applicationContext.getBean(PluginFactory.class);
173+
PluginSetting pluginSetting = new PluginSetting(sinkType, settingsMap);
174+
pluginSetting.setPipelineName("live-capture-pipeline");
175+
176+
@SuppressWarnings("unchecked")
177+
Sink<Record<Event>> sink = (Sink<Record<Event>>) pluginFactory.loadPlugin(
178+
Sink.class, pluginSetting, new SinkContext(null));
179+
180+
LOG.info("Created {} sink for live capture", sinkType);
181+
return sink;
182+
} catch (Exception e) {
183+
LOG.error("Failed to create sink for type '{}': {}", sinkType, e.getMessage(), e);
184+
return null;
185+
}
186+
}
187+
188+
189+
/**
190+
* Shuts down live capture components when the application context is destroyed.
191+
*/
192+
@PreDestroy
193+
public void shutdownLiveCapture() {
194+
try {
195+
LiveCaptureOutputManager.getInstance().shutdown();
196+
LOG.info("LiveCaptureOutputManager shutdown completed");
197+
} catch (Exception e) {
198+
LOG.error("Error during LiveCaptureOutputManager shutdown: {}", e.getMessage(), e);
199+
}
134200
}
135201

136202
/**
@@ -145,4 +211,4 @@ public LiveCaptureHandler liveCaptureHandler(final EventFactory eventFactory) {
145211
}
146212

147213

148-
}
214+
}

data-prepper-core/src/main/java/org/opensearch/dataprepper/core/livecapture/LiveCaptureConfiguration.java

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414
*/
1515
public class LiveCaptureConfiguration {
1616

17+
private static final boolean DEFAULT_ENABLED = false;
18+
private static final double DEFAULT_RATE = 1.0;
1719

1820
private final boolean defaultEnabled;
1921
private final double defaultRate;
@@ -22,6 +24,12 @@ public class LiveCaptureConfiguration {
2224
public LiveCaptureConfiguration() {
2325
this.defaultEnabled = false;
2426
this.defaultRate = 1.0;
27+
/**
28+
* Default constructor with default values.
29+
*/
30+
public LiveCaptureConfiguration() {
31+
this.defaultEnabled = DEFAULT_ENABLED;
32+
this.defaultRate = DEFAULT_RATE;
2533
this.liveCaptureOutputSinkConfig = null;
2634
}
2735

@@ -35,6 +43,14 @@ public LiveCaptureConfiguration(
3543
this.defaultRate = defaultRate != null ? defaultRate : 1.0;
3644
this.liveCaptureOutputSinkConfig = liveCaptureOutputSinkConfig;
3745

46+
this.defaultEnabled = defaultEnabled != null ? defaultEnabled : DEFAULT_ENABLED;
47+
this.defaultRate = defaultRate != null ? defaultRate : DEFAULT_RATE;
48+
this.liveCaptureOutputSinkConfig = liveCaptureOutputSinkConfig;
49+
50+
// validate rate
51+
if (this.defaultRate <= 0) {
52+
throw new IllegalArgumentException("default_rate must be positive, got: " + this.defaultRate);
53+
}
3854
}
3955

4056
public boolean isDefaultEnabled() {
@@ -49,4 +65,12 @@ public Object getLiveCaptureOutputSinkConfig() {
4965
return liveCaptureOutputSinkConfig;
5066
}
5167

52-
}
68+
@Override
69+
public String toString() {
70+
return "LiveCaptureConfiguration{" +
71+
"defaultEnabled=" + defaultEnabled +
72+
", defaultRate=" + defaultRate +
73+
", liveCaptureOutputSinkConfig=" + liveCaptureOutputSinkConfig +
74+
'}';
75+
}
76+
}
Lines changed: 213 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,213 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.dataprepper.core.livecapture;
7+
8+
import org.junit.jupiter.api.BeforeEach;
9+
import org.junit.jupiter.api.Test;
10+
import org.junit.jupiter.api.extension.ExtendWith;
11+
import org.mockito.Mock;
12+
import org.mockito.MockedStatic;
13+
import org.mockito.junit.jupiter.MockitoExtension;
14+
import org.opensearch.dataprepper.core.parser.model.DataPrepperConfiguration;
15+
import org.opensearch.dataprepper.model.configuration.PluginSetting;
16+
import org.opensearch.dataprepper.model.event.Event;
17+
import org.opensearch.dataprepper.model.event.EventFactory;
18+
import org.opensearch.dataprepper.model.plugin.PluginFactory;
19+
import org.opensearch.dataprepper.model.record.Record;
20+
import org.opensearch.dataprepper.model.sink.Sink;
21+
import org.opensearch.dataprepper.model.sink.SinkContext;
22+
import org.springframework.context.ApplicationContext;
23+
24+
import java.util.Map;
25+
26+
import static org.hamcrest.MatcherAssert.assertThat;
27+
import static org.hamcrest.Matchers.instanceOf;
28+
import static org.hamcrest.Matchers.notNullValue;
29+
import static org.mockito.ArgumentMatchers.any;
30+
import static org.mockito.Mockito.mock;
31+
import static org.mockito.Mockito.mockStatic;
32+
import static org.mockito.Mockito.verify;
33+
import static org.mockito.Mockito.when;
34+
35+
@ExtendWith(MockitoExtension.class)
36+
class LiveCaptureAppConfigTest {
37+
38+
@Mock
39+
private DataPrepperConfiguration dataPrepperConfiguration;
40+
41+
@Mock
42+
private LiveCaptureConfiguration liveCaptureConfiguration;
43+
44+
@Mock
45+
private EventFactory defaultEventFactory;
46+
47+
@Mock
48+
private LiveCaptureOutputManager mockOutputManager;
49+
50+
@Mock
51+
private ApplicationContext applicationContext;
52+
53+
@Mock
54+
private PluginFactory pluginFactory;
55+
56+
@Mock
57+
private Sink<Record<Event>> mockSink;
58+
59+
private LiveCaptureAppConfig liveCaptureAppConfig;
60+
61+
@BeforeEach
62+
void setUp() {
63+
liveCaptureAppConfig = new LiveCaptureAppConfig(dataPrepperConfiguration);
64+
liveCaptureAppConfig.setApplicationContext(applicationContext);
65+
}
66+
67+
@Test
68+
void initializeLiveCaptureManager_with_valid_configuration() {
69+
when(dataPrepperConfiguration.getLiveCaptureConfiguration()).thenReturn(liveCaptureConfiguration);
70+
when(liveCaptureConfiguration.isDefaultEnabled()).thenReturn(true);
71+
when(liveCaptureConfiguration.getDefaultRate()).thenReturn(5.0);
72+
73+
try (MockedStatic<LiveCaptureManager> mockedLiveCaptureManager = mockStatic(LiveCaptureManager.class)) {
74+
liveCaptureAppConfig.initializeLiveCaptureManager();
75+
76+
mockedLiveCaptureManager.verify(() -> LiveCaptureManager.initialize(true, 5.0));
77+
}
78+
}
79+
80+
@Test
81+
void initializeLiveCaptureManager_with_null_configuration() {
82+
when(dataPrepperConfiguration.getLiveCaptureConfiguration()).thenReturn(null);
83+
84+
try (MockedStatic<LiveCaptureManager> mockedLiveCaptureManager = mockStatic(LiveCaptureManager.class)) {
85+
liveCaptureAppConfig.initializeLiveCaptureManager();
86+
87+
// Should use defaults: enabled=false, rate=1.0
88+
mockedLiveCaptureManager.verify(() -> LiveCaptureManager.initialize(false, 1.0));
89+
}
90+
}
91+
92+
@Test
93+
void shutdownLiveCapture_calls_output_manager_shutdown() {
94+
try (MockedStatic<LiveCaptureOutputManager> mockedOutputManager = mockStatic(LiveCaptureOutputManager.class)) {
95+
mockedOutputManager.when(LiveCaptureOutputManager::getInstance).thenReturn(mockOutputManager);
96+
97+
liveCaptureAppConfig.shutdownLiveCapture();
98+
99+
verify(mockOutputManager).shutdown();
100+
}
101+
}
102+
103+
@Test
104+
void liveCaptureHandler_returns_handler_instance() {
105+
EventFactory mockEventFactory = mock(EventFactory.class);
106+
LiveCaptureHandler handler = liveCaptureAppConfig.liveCaptureHandler(mockEventFactory);
107+
108+
assertThat(handler, notNullValue());
109+
assertThat(handler, instanceOf(LiveCaptureHandler.class));
110+
}
111+
112+
113+
114+
@Test
115+
void initializeLiveCaptureManager_with_cloudwatch_sink_configuration() {
116+
// Create CloudWatch sink configuration
117+
Map<String, Object> awsConfig = Map.of("region", "us-east-1");
118+
Map<String, Object> cloudwatchConfig = Map.of(
119+
"log_group", "/dataprepper/livecapture",
120+
"log_stream", "livecapture-events",
121+
"aws", awsConfig
122+
);
123+
Map<String, Object> sinkConfig = Map.of(
124+
"cloudwatch_logs", cloudwatchConfig,
125+
"entry_threshold", 3,
126+
"batch_size", 10
127+
);
128+
129+
when(dataPrepperConfiguration.getLiveCaptureConfiguration()).thenReturn(liveCaptureConfiguration);
130+
when(liveCaptureConfiguration.isDefaultEnabled()).thenReturn(true);
131+
when(liveCaptureConfiguration.getDefaultRate()).thenReturn(2.0);
132+
when(liveCaptureConfiguration.getLiveCaptureOutputSinkConfig()).thenReturn(sinkConfig);
133+
134+
// Mock PluginFactory to return a mock sink
135+
when(applicationContext.getBean(PluginFactory.class)).thenReturn(pluginFactory);
136+
when(pluginFactory.loadPlugin(any(Class.class), any(PluginSetting.class), any(SinkContext.class))).thenReturn(mockSink);
137+
138+
try (MockedStatic<LiveCaptureManager> mockedLiveCaptureManager = mockStatic(LiveCaptureManager.class);
139+
MockedStatic<LiveCaptureOutputManager> mockedOutputManager = mockStatic(LiveCaptureOutputManager.class)) {
140+
141+
mockedOutputManager.when(LiveCaptureOutputManager::getInstance).thenReturn(mockOutputManager);
142+
143+
liveCaptureAppConfig.initializeLiveCaptureManager();
144+
145+
// Verify LiveCaptureManager initialization
146+
mockedLiveCaptureManager.verify(() -> LiveCaptureManager.initialize(true, 2.0));
147+
148+
// Verify output manager initialization with plugin-based sink
149+
verify(mockOutputManager).initialize(any(Sink.class), any(Integer.class), any(Integer.class));
150+
verify(mockOutputManager).enable();
151+
}
152+
}
153+
154+
@Test
155+
void initializeLiveCaptureManager_with_invalid_sink_configuration() {
156+
// Create invalid configuration (missing log_group)
157+
Map<String, Object> cloudwatchConfig = Map.of("log_stream", "test-stream");
158+
Map<String, Object> sinkConfig = Map.of("cloudwatch_logs", cloudwatchConfig);
159+
160+
when(dataPrepperConfiguration.getLiveCaptureConfiguration()).thenReturn(liveCaptureConfiguration);
161+
when(liveCaptureConfiguration.isDefaultEnabled()).thenReturn(true);
162+
when(liveCaptureConfiguration.getDefaultRate()).thenReturn(2.0);
163+
when(liveCaptureConfiguration.getLiveCaptureOutputSinkConfig()).thenReturn(sinkConfig);
164+
165+
// Mock PluginFactory to simulate failure
166+
when(applicationContext.getBean(PluginFactory.class)).thenReturn(pluginFactory);
167+
when(pluginFactory.loadPlugin(any(Class.class), any(PluginSetting.class), any(SinkContext.class))).thenThrow(new RuntimeException("Plugin creation failed"));
168+
169+
try (MockedStatic<LiveCaptureManager> mockedLiveCaptureManager = mockStatic(LiveCaptureManager.class);
170+
MockedStatic<LiveCaptureOutputManager> mockedOutputManager = mockStatic(LiveCaptureOutputManager.class)) {
171+
172+
mockedOutputManager.when(LiveCaptureOutputManager::getInstance).thenReturn(mockOutputManager);
173+
174+
// Should not throw exception even with invalid config
175+
liveCaptureAppConfig.initializeLiveCaptureManager();
176+
177+
// Should still initialize LiveCaptureManager
178+
mockedLiveCaptureManager.verify(() -> LiveCaptureManager.initialize(true, 2.0));
179+
}
180+
}
181+
182+
@Test
183+
void testPluginBasedSinkFunctionality() {
184+
// This test exercises the plugin-based sink functionality
185+
Map<String, Object> awsConfig = Map.of("region", "us-west-2");
186+
Map<String, Object> cloudwatchConfig = Map.of(
187+
"log_group", "/test/livecapture",
188+
"log_stream", "test-stream",
189+
"aws", awsConfig
190+
);
191+
Map<String, Object> sinkConfig = Map.of("cloudwatch_logs", cloudwatchConfig);
192+
193+
when(dataPrepperConfiguration.getLiveCaptureConfiguration()).thenReturn(liveCaptureConfiguration);
194+
when(liveCaptureConfiguration.isDefaultEnabled()).thenReturn(false);
195+
when(liveCaptureConfiguration.getDefaultRate()).thenReturn(1.0);
196+
when(liveCaptureConfiguration.getLiveCaptureOutputSinkConfig()).thenReturn(sinkConfig);
197+
198+
// Mock PluginFactory to return a mock sink
199+
when(applicationContext.getBean(PluginFactory.class)).thenReturn(pluginFactory);
200+
when(pluginFactory.loadPlugin(any(Class.class), any(PluginSetting.class), any(SinkContext.class))).thenReturn(mockSink);
201+
202+
try (MockedStatic<LiveCaptureManager> mockedLiveCaptureManager = mockStatic(LiveCaptureManager.class);
203+
MockedStatic<LiveCaptureOutputManager> mockedOutputManager = mockStatic(LiveCaptureOutputManager.class)) {
204+
205+
mockedOutputManager.when(LiveCaptureOutputManager::getInstance).thenReturn(mockOutputManager);
206+
207+
liveCaptureAppConfig.initializeLiveCaptureManager();
208+
209+
// Verify that initialize was called with a Sink (which would be our plugin-based sink)
210+
verify(mockOutputManager).initialize(any(Sink.class), any(Integer.class), any(Integer.class));
211+
}
212+
}
213+
}

0 commit comments

Comments
 (0)