Skip to content

Commit 2669b70

Browse files
committed
Re-enable ZeroBuffer
Signed-off-by: Mohammed Aghil Puthiyottil <57040494+MohammedAghil@users.noreply.github.com>
1 parent d599a4f commit 2669b70

7 files changed

Lines changed: 103 additions & 4 deletions

File tree

data-prepper-core/src/main/java/org/opensearch/dataprepper/core/parser/PipelineTransformer.java

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
import org.opensearch.dataprepper.core.peerforwarder.PeerForwardingProcessorDecorator;
1313
import org.opensearch.dataprepper.core.pipeline.Pipeline;
1414
import org.opensearch.dataprepper.core.pipeline.PipelineConnector;
15+
import org.opensearch.dataprepper.core.pipeline.PipelineRunnerImpl;
16+
import org.opensearch.dataprepper.core.pipeline.SupportsPipelineRunner;
1517
import org.opensearch.dataprepper.core.pipeline.router.Router;
1618
import org.opensearch.dataprepper.core.pipeline.router.RouterFactory;
1719
import org.opensearch.dataprepper.core.sourcecoordination.SourceCoordinatorFactory;
@@ -228,10 +230,30 @@ private void buildPipelineFromConfiguration(
228230
dataPrepperConfiguration.getProcessorShutdownTimeout(), dataPrepperConfiguration.getSinkShutdownTimeout(),
229231
getPeerForwarderDrainTimeout(dataPrepperConfiguration));
230232

231-
// TODO: Re-enable zero-buffer
232-
//if (pipelineDefinedBuffer instanceof SupportsPipelineRunner) {
233-
// ((SupportsPipelineRunner) pipelineDefinedBuffer).setPipelineRunner(new PipelineRunnerImpl(pipeline, processors));
234-
//}
233+
if (pipelineDefinedBuffer instanceof SupportsPipelineRunner) {
234+
// Check if there are any processors with @SingleThread annotation
235+
boolean hasSingleThreadedProcessors = processorSets.stream()
236+
.flatMap(List::stream)
237+
.map(IdentifiedComponent::getComponent)
238+
.map(Object::getClass)
239+
.anyMatch(processorClass -> processorClass.isAnnotationPresent(SingleThread.class));
240+
241+
// Only allow ZeroBuffer for single-threaded pipelines with no @SingleThread processors
242+
if (processorThreads == 1 && !hasSingleThreadedProcessors) {
243+
((SupportsPipelineRunner) pipelineDefinedBuffer).setPipelineRunner(
244+
new PipelineRunnerImpl(pipeline, pipeline.getProcessors()));
245+
} else {
246+
if (hasSingleThreadedProcessors) {
247+
throw new IllegalStateException(
248+
"ZeroBuffer cannot be used with @SingleThread processors. " +
249+
"Pipeline [" + pipelineName + "] contains one or more @SingleThread processors.");
250+
} else {
251+
throw new IllegalStateException(
252+
"ZeroBuffer cannot be used with multiple processor threads. " +
253+
"Pipeline [" + pipelineName + "] is configured with " + processorThreads + " threads.");
254+
}
255+
}
256+
}
235257

236258
pipelineMap.put(pipelineName, pipeline);
237259
} catch (Exception ex) {

data-prepper-core/src/test/java/org/opensearch/dataprepper/TestDataProvider.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,10 @@ public class TestDataProvider {
7171
public static final String VALID_PEER_FORWARDER_WITH_ACM_SSL_CONFIG_FILE = "src/test/resources/valid_peer_forwarder_config_with_acm_ssl.yml";
7272
public static final String VALID_DATA_PREPPER_CONFIG_WITH_METRIC_FILTER = "src/test/resources/valid_data_prepper_config_with_metric_filter.yml";
7373
public static final String INVALID_DATA_PREPPER_CONFIG_WITH_METRIC_FILTER = "src/test/resources/invalid_data_prepper_config_with_metric_filter.yml";
74+
public static final String VALID_ZERO_BUFFER_SINGLE_THREAD_CONFIG_FILE = "src/test/resources/valid_zero_buffer_single_thread.yml";
75+
public static final String INVALID_ZERO_BUFFER_MULTIPLE_THREADS_CONFIG_FILE = "src/test/resources/invalid_zero_buffer_multiple_threads.yml";
76+
public static final String INVALID_ZERO_BUFFER_WITH_SINGLE_THREAD_PROCESSOR_CONFIG_FILE = "src/test/resources/invalid_zero_buffer_with_single_thread_processor.yml";
77+
public static final String INVALID_ZERO_BUFFER_MULTIPLE_THREADS_NO_SINGLE_THREAD_PROCESSORS_CONFIG_FILE = "src/test/resources/invalid_zero_buffer_multiple_threads_no_single_thread_processors.yml";
7478
public static Set<String> VALID_MULTIPLE_PIPELINE_NAMES = new HashSet<>(Arrays.asList("test-pipeline-1",
7579
"test-pipeline-2", "test-pipeline-3"));
7680
}

data-prepper-core/src/test/java/org/opensearch/dataprepper/core/parser/PipelineTransformerTests.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@
6767
import static org.hamcrest.Matchers.hasKey;
6868
import static org.hamcrest.core.Is.is;
6969
import static org.junit.jupiter.api.Assertions.assertThrows;
70+
import static org.mockito.Mockito.mock;
7071
import static org.mockito.ArgumentMatchers.anyString;
7172
import static org.mockito.Mockito.doNothing;
7273
import static org.mockito.Mockito.mock;
@@ -603,4 +604,39 @@ private Map<String, Map<String, PeerForwarderReceiveBuffer<Record<Event>>>> gene
603604

604605
return bufferMap;
605606
}
607+
608+
@Test
609+
void parseConfiguration_with_zero_buffer_and_single_worker_thread_creates_pipeline_successfully() {
610+
mockDataPrepperConfigurationAccesses();
611+
final PipelineTransformer pipelineTransformer =
612+
createObjectUnderTest(TestDataProvider.VALID_ZERO_BUFFER_SINGLE_THREAD_CONFIG_FILE);
613+
final Map<String, Pipeline> pipelineMap = pipelineTransformer.transformConfiguration();
614+
615+
assertThat(pipelineMap.size(), equalTo(1));
616+
assertThat(pipelineMap, hasKey("simple-pipeline"));
617+
assertThat(pipelineMap.isEmpty(), equalTo(false));
618+
619+
verifyDataPrepperConfigurationAccesses();
620+
verify(dataPrepperConfiguration).getPipelineExtensions();
621+
}
622+
623+
@ParameterizedTest
624+
@MethodSource("provideInvalidZeroBufferConfigFiles")
625+
void parseConfiguration_with_invalid_zero_buffer_configurations_results_in_pipeline_creation_failure(String configFile) {
626+
mockDataPrepperConfigurationAccesses();
627+
final PipelineTransformer pipelineTransformer = createObjectUnderTest(configFile);
628+
final Map<String, Pipeline> pipelineMap = pipelineTransformer.transformConfiguration();
629+
630+
assertThat(pipelineMap.isEmpty(), equalTo(true));
631+
632+
verify(dataPrepperConfiguration).getPipelineExtensions();
633+
}
634+
635+
private static Stream<Arguments> provideInvalidZeroBufferConfigFiles() {
636+
return Stream.of(
637+
Arguments.of(TestDataProvider.INVALID_ZERO_BUFFER_MULTIPLE_THREADS_CONFIG_FILE),
638+
Arguments.of(TestDataProvider.INVALID_ZERO_BUFFER_WITH_SINGLE_THREAD_PROCESSOR_CONFIG_FILE),
639+
Arguments.of(TestDataProvider.INVALID_ZERO_BUFFER_MULTIPLE_THREADS_NO_SINGLE_THREAD_PROCESSORS_CONFIG_FILE)
640+
);
641+
}
606642
}
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
simple-sample-pipeline:
2+
workers: 2
3+
source:
4+
random:
5+
buffer:
6+
zero:
7+
sink:
8+
- stdout:
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
simple-pipeline:
2+
workers: 2
3+
source:
4+
random:
5+
buffer:
6+
zero:
7+
processor:
8+
- string_converter:
9+
upper_case: true
10+
sink:
11+
- stdout:
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
simple-pipeline:
2+
workers: 1
3+
source:
4+
random:
5+
buffer:
6+
zero:
7+
processor:
8+
- test_processor:
9+
sink:
10+
- stdout:
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
simple-pipeline:
2+
workers: 1
3+
source:
4+
random:
5+
buffer:
6+
zero:
7+
sink:
8+
- stdout:

0 commit comments

Comments
 (0)