1+ /*
2+ * Copyright OpenSearch Contributors
3+ * SPDX-License-Identifier: Apache-2.0
4+ */
5+
6+ package org .opensearch .dataprepper .core .livecapture ;
7+
8+ import org .opensearch .dataprepper .core .parser .model .DataPrepperConfiguration ;
9+ import org .opensearch .dataprepper .model .configuration .PluginSetting ;
10+ import org .opensearch .dataprepper .model .event .Event ;
11+ import org .opensearch .dataprepper .model .event .EventFactory ;
12+ import org .opensearch .dataprepper .model .plugin .PluginFactory ;
13+ import org .opensearch .dataprepper .model .record .Record ;
14+ import org .opensearch .dataprepper .model .sink .Sink ;
15+ import org .opensearch .dataprepper .model .sink .SinkContext ;
16+ import org .slf4j .Logger ;
17+ import org .slf4j .LoggerFactory ;
18+ import org .springframework .beans .BeansException ;
19+ import org .springframework .context .ApplicationContext ;
20+ import org .springframework .context .ApplicationContextAware ;
21+ import org .springframework .context .annotation .Bean ;
22+ import org .springframework .context .annotation .Configuration ;
23+ import org .springframework .context .event .ContextRefreshedEvent ;
24+ import org .springframework .context .event .EventListener ;
25+
26+ import javax .annotation .PreDestroy ;
27+ import javax .inject .Inject ;
28+ import java .util .Map ;
29+
30+ /**
31+ * Spring configuration for live capture functionality.
32+ * Sets up beans and initializes the live capture manager with configuration values.
33+ */
34+ @ Configuration
35+ public class LiveCaptureAppConfig implements ApplicationContextAware {
36+
37+ private static final Logger LOG = LoggerFactory .getLogger (LiveCaptureAppConfig .class );
38+
39+ private final DataPrepperConfiguration dataPrepperConfiguration ;
40+ private ApplicationContext applicationContext ;
41+
42+ @ Inject
43+ public LiveCaptureAppConfig (final DataPrepperConfiguration dataPrepperConfiguration ) {
44+ this .dataPrepperConfiguration = dataPrepperConfiguration ;
45+ }
46+
47+ @ Override
48+ public void setApplicationContext (ApplicationContext applicationContext ) throws BeansException {
49+ this .applicationContext = applicationContext ;
50+ }
51+
52+
53+ @ EventListener (ContextRefreshedEvent .class )
54+ public void initializeLiveCaptureManager () {
55+ final LiveCaptureConfiguration liveCaptureConfig = dataPrepperConfiguration .getLiveCaptureConfiguration ();
56+
57+ boolean defaultEnabled = liveCaptureConfig != null && liveCaptureConfig .isDefaultEnabled ();
58+ double defaultRate = liveCaptureConfig != null ? liveCaptureConfig .getDefaultRate () : 1.0 ;
59+
60+ LiveCaptureManager .initialize (defaultEnabled , defaultRate );
61+ LOG .info ("LiveCaptureManager initialized with default enabled: {}, default rate: {}" ,
62+ defaultEnabled , defaultRate );
63+
64+ // Initialize LiveCaptureOutputManager if sink configuration is present
65+ if (liveCaptureConfig != null && liveCaptureConfig .getLiveCaptureOutputSinkConfig () != null ) {
66+ initializeLiveCaptureOutputManager (liveCaptureConfig );
67+
68+ // Sync output manager state with live capture default state
69+ if (defaultEnabled ) {
70+ LiveCaptureOutputManager .getInstance ().enable ();
71+ } else {
72+ LiveCaptureOutputManager .getInstance ().disable ();
73+ }
74+ }
75+ }
76+
77+ /**
78+ * Initializes the LiveCaptureOutputManager with the configured sink.
79+ */
80+ private void initializeLiveCaptureOutputManager (final LiveCaptureConfiguration liveCaptureConfig ) {
81+ Object sinkConfig = liveCaptureConfig .getLiveCaptureOutputSinkConfig ();
82+ if (!(sinkConfig instanceof Map )) {
83+ return ;
84+ }
85+
86+ @ SuppressWarnings ("unchecked" )
87+ Map <String , Object > sinkConfigMap = (Map <String , Object >) sinkConfig ;
88+
89+ int entryThreshold = getIntValue (sinkConfigMap , "entry_threshold" , 1 );
90+ int batchSize = getIntValue (sinkConfigMap , "batch_size" , 1 );
91+
92+ Sink <Record <Event >> eventSink = null ;
93+ for (Map .Entry <String , Object > entry : sinkConfigMap .entrySet ()) {
94+ if (!"entry_threshold" .equals (entry .getKey ()) && !"batch_size" .equals (entry .getKey ())) {
95+ eventSink = createPluginBasedSink (entry .getKey (), entry .getValue ());
96+ break ;
97+ }
98+ }
99+
100+ if (eventSink != null ) {
101+ LiveCaptureOutputManager .getInstance ().initialize (eventSink , entryThreshold , batchSize );
102+ LOG .info ("LiveCaptureOutputManager initialized with sink: {}" , eventSink .getClass ().getSimpleName ());
103+ }
104+ }
105+
106+ private int getIntValue (Map <String , Object > map , String key , int defaultValue ) {
107+ Object value = map .get (key );
108+ return value instanceof Number ? ((Number ) value ).intValue () : defaultValue ;
109+ }
110+
111+ private Sink <Record <Event >> createPluginBasedSink (String sinkType , Object sinkSettings ) {
112+ if (!(sinkSettings instanceof Map )) {
113+ return null ;
114+ }
115+
116+ @ SuppressWarnings ("unchecked" )
117+ Map <String , Object > settingsMap = (Map <String , Object >) sinkSettings ;
118+
119+ PluginFactory pluginFactory = applicationContext .getBean (PluginFactory .class );
120+ PluginSetting pluginSetting = new PluginSetting (sinkType , settingsMap );
121+ pluginSetting .setPipelineName ("live-capture-pipeline" );
122+
123+ @ SuppressWarnings ("unchecked" )
124+ Sink <Record <Event >> sink = (Sink <Record <Event >>) pluginFactory .loadPlugin (
125+ Sink .class , pluginSetting , new SinkContext (null ));
126+
127+ return sink ;
128+ }
129+
130+
131+ @ PreDestroy
132+ public void shutdownLiveCapture () {
133+ LiveCaptureOutputManager .getInstance ().shutdown ();
134+ }
135+
136+ /**
137+ * Creates the LiveCaptureHandler bean for handling REST API requests.
138+ *
139+ * @param eventFactory the event factory to use
140+ * @return the LiveCaptureHandler instance
141+ */
142+ @ Bean
143+ public LiveCaptureHandler liveCaptureHandler (final EventFactory eventFactory ) {
144+ return new LiveCaptureHandler (eventFactory );
145+ }
146+
147+
148+ }
0 commit comments