Skip to content

Commit d65c426

Browse files
authored
Create new codec for each s3 group in s3 sink (opensearch-project#4410)
Signed-off-by: Taylor Gray <tylgry@amazon.com>
1 parent beb2815 commit d65c426

9 files changed

Lines changed: 123 additions & 25 deletions

File tree

data-prepper-plugins/s3-sink/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkServiceIT.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
import org.opensearch.dataprepper.plugins.sink.s3.accumulator.CompressionBufferFactory;
5656
import org.opensearch.dataprepper.plugins.sink.s3.accumulator.InMemoryBufferFactory;
5757
import org.opensearch.dataprepper.plugins.sink.s3.accumulator.ObjectKey;
58+
import org.opensearch.dataprepper.plugins.sink.s3.codec.CodecFactory;
5859
import org.opensearch.dataprepper.plugins.sink.s3.compression.CompressionOption;
5960
import org.opensearch.dataprepper.plugins.sink.s3.configuration.AwsAuthenticationOptions;
6061
import org.opensearch.dataprepper.plugins.sink.s3.configuration.ObjectKeyOptions;
@@ -139,6 +140,9 @@ class S3SinkServiceIT {
139140
private OutputCodec codec;
140141
private KeyGenerator keyGenerator;
141142

143+
@Mock
144+
private CodecFactory codecFactory;
145+
142146
@Mock
143147
NdjsonOutputConfig ndjsonOutputConfig;
144148

@@ -177,6 +181,8 @@ public void setUp() {
177181
@Test
178182
void verify_flushed_object_count_into_s3_bucket() {
179183
configureNewLineCodec();
184+
when(codecFactory.provideCodec()).thenReturn(codec);
185+
180186
int s3ObjectCountBeforeIngest = gets3ObjectCount();
181187
S3SinkService s3SinkService = createObjectUnderTest();
182188
s3SinkService.output(setEventQueue());
@@ -192,6 +198,8 @@ void configureNewLineCodec() {
192198
@Test
193199
void verify_flushed_records_into_s3_bucketNewLine() throws JsonProcessingException {
194200
configureNewLineCodec();
201+
202+
when(codecFactory.provideCodec()).thenReturn(codec);
195203
S3SinkService s3SinkService = createObjectUnderTest();
196204
Collection<Record<Event>> recordsData = setEventQueue();
197205

@@ -221,6 +229,8 @@ void verify_flushed_records_into_s3_bucketNewLine() throws JsonProcessingExcepti
221229
@Test
222230
void verify_flushed_records_into_s3_bucketNewLine_with_compression() throws IOException {
223231
configureNewLineCodec();
232+
when(codecFactory.provideCodec()).thenReturn(codec);
233+
224234
bufferFactory = new CompressionBufferFactory(bufferFactory, CompressionOption.GZIP.getCompressionEngine(), codec);
225235
S3SinkService s3SinkService = createObjectUnderTest();
226236
Collection<Record<Event>> recordsData = setEventQueue();
@@ -256,9 +266,9 @@ void verify_flushed_records_into_s3_bucketNewLine_with_compression() throws IOEx
256266
private S3SinkService createObjectUnderTest() {
257267
OutputCodecContext codecContext = new OutputCodecContext("Tag", Collections.emptyList(), Collections.emptyList());
258268
final S3GroupIdentifierFactory groupIdentifierFactory = new S3GroupIdentifierFactory(keyGenerator, expressionEvaluator, s3SinkConfig);
259-
s3GroupManager = new S3GroupManager(s3SinkConfig, groupIdentifierFactory, bufferFactory, s3Client);
269+
s3GroupManager = new S3GroupManager(s3SinkConfig, groupIdentifierFactory, bufferFactory, codecFactory, s3Client);
260270

261-
return new S3SinkService(s3SinkConfig, codec, codecContext, s3Client, keyGenerator, Duration.ofSeconds(5), pluginMetrics, s3GroupManager);
271+
return new S3SinkService(s3SinkConfig, codecContext, s3Client, keyGenerator, Duration.ofSeconds(5), pluginMetrics, s3GroupManager);
262272
}
263273

264274
private int gets3ObjectCount() {
@@ -351,6 +361,8 @@ private static List<HashMap> generateRecords(int numberOfRecords) {
351361
@Disabled
352362
void verify_flushed_records_into_s3_bucket_Parquet() throws IOException {
353363
configureParquetCodec();
364+
when(codecFactory.provideCodec()).thenReturn(codec);
365+
354366
S3SinkService s3SinkService = createObjectUnderTest();
355367
Collection<Record<Event>> recordsData = getRecordList();
356368

data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3Sink.java

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.opensearch.dataprepper.plugins.sink.s3.accumulator.CodecBufferFactory;
2727
import org.opensearch.dataprepper.plugins.sink.s3.accumulator.CompressionBufferFactory;
2828
import org.opensearch.dataprepper.plugins.sink.s3.codec.BufferedCodec;
29+
import org.opensearch.dataprepper.plugins.sink.s3.codec.CodecFactory;
2930
import org.opensearch.dataprepper.plugins.sink.s3.compression.CompressionEngine;
3031
import org.opensearch.dataprepper.plugins.sink.s3.compression.CompressionOption;
3132
import org.opensearch.dataprepper.plugins.sink.s3.grouping.S3GroupIdentifierFactory;
@@ -48,7 +49,6 @@ public class S3Sink extends AbstractSink<Record<Event>> {
4849

4950
private static final Duration RETRY_FLUSH_BACKOFF = Duration.ofSeconds(5);
5051
private final S3SinkConfig s3SinkConfig;
51-
private final OutputCodec codec;
5252
private volatile boolean sinkInitialized;
5353
private final S3SinkService s3SinkService;
5454
private final BufferFactory bufferFactory;
@@ -70,24 +70,26 @@ public S3Sink(final PluginSetting pluginSetting,
7070
this.s3SinkConfig = s3SinkConfig;
7171
this.sinkContext = sinkContext;
7272
final PluginModel codecConfiguration = s3SinkConfig.getCodec();
73+
final CodecFactory codecFactory = new CodecFactory(pluginFactory, codecConfiguration);
74+
7375
final PluginSetting codecPluginSettings = new PluginSetting(codecConfiguration.getPluginName(),
7476
codecConfiguration.getPluginSettings());
75-
codec = pluginFactory.loadPlugin(OutputCodec.class, codecPluginSettings);
77+
final OutputCodec testCodec = pluginFactory.loadPlugin(OutputCodec.class, codecPluginSettings);
7678
sinkInitialized = Boolean.FALSE;
7779

7880
final S3Client s3Client = ClientFactory.createS3Client(s3SinkConfig, awsCredentialsSupplier);
7981
BufferFactory innerBufferFactory = s3SinkConfig.getBufferType().getBufferFactory();
80-
if(codec instanceof ParquetOutputCodec && s3SinkConfig.getBufferType() != BufferTypeOptions.INMEMORY) {
82+
if(testCodec instanceof ParquetOutputCodec && s3SinkConfig.getBufferType() != BufferTypeOptions.INMEMORY) {
8183
throw new InvalidPluginConfigurationException("The Parquet sink codec is an in_memory buffer only.");
8284
}
83-
if(codec instanceof BufferedCodec) {
84-
innerBufferFactory = new CodecBufferFactory(innerBufferFactory, (BufferedCodec) codec);
85+
if(testCodec instanceof BufferedCodec) {
86+
innerBufferFactory = new CodecBufferFactory(innerBufferFactory, (BufferedCodec) testCodec);
8587
}
8688
CompressionOption compressionOption = s3SinkConfig.getCompression();
8789
final CompressionEngine compressionEngine = compressionOption.getCompressionEngine();
88-
bufferFactory = new CompressionBufferFactory(innerBufferFactory, compressionEngine, codec);
90+
bufferFactory = new CompressionBufferFactory(innerBufferFactory, compressionEngine, testCodec);
8991

90-
ExtensionProvider extensionProvider = StandardExtensionProvider.create(codec, compressionOption);
92+
ExtensionProvider extensionProvider = StandardExtensionProvider.create(testCodec, compressionOption);
9193
KeyGenerator keyGenerator = new KeyGenerator(s3SinkConfig, extensionProvider, expressionEvaluator);
9294

9395
if (s3SinkConfig.getObjectKeyOptions().getPathPrefix() != null &&
@@ -102,13 +104,13 @@ public S3Sink(final PluginSetting pluginSetting,
102104

103105
S3OutputCodecContext s3OutputCodecContext = new S3OutputCodecContext(OutputCodecContext.fromSinkContext(sinkContext), compressionOption);
104106

105-
codec.validateAgainstCodecContext(s3OutputCodecContext);
107+
testCodec.validateAgainstCodecContext(s3OutputCodecContext);
106108

107109
final S3GroupIdentifierFactory s3GroupIdentifierFactory = new S3GroupIdentifierFactory(keyGenerator, expressionEvaluator, s3SinkConfig);
108-
final S3GroupManager s3GroupManager = new S3GroupManager(s3SinkConfig, s3GroupIdentifierFactory, bufferFactory, s3Client);
110+
final S3GroupManager s3GroupManager = new S3GroupManager(s3SinkConfig, s3GroupIdentifierFactory, bufferFactory, codecFactory, s3Client);
109111

110112

111-
s3SinkService = new S3SinkService(s3SinkConfig, codec, s3OutputCodecContext, s3Client, keyGenerator, RETRY_FLUSH_BACKOFF, pluginMetrics, s3GroupManager);
113+
s3SinkService = new S3SinkService(s3SinkConfig, s3OutputCodecContext, s3Client, keyGenerator, RETRY_FLUSH_BACKOFF, pluginMetrics, s3GroupManager);
112114
}
113115

114116
@Override

data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/S3SinkService.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,6 @@ public class S3SinkService {
4848
static final String S3_OBJECTS_SIZE = "s3SinkObjectSizeBytes";
4949
private final S3SinkConfig s3SinkConfig;
5050
private final Lock reentrantLock;
51-
private final OutputCodec codec;
5251
private final S3Client s3Client;
5352
private final int maxEvents;
5453
private final ByteCount maxBytes;
@@ -70,15 +69,13 @@ public class S3SinkService {
7069

7170
/**
7271
* @param s3SinkConfig s3 sink related configuration.
73-
* @param codec parser.
7472
* @param s3Client
7573
* @param pluginMetrics metrics.
7674
*/
77-
public S3SinkService(final S3SinkConfig s3SinkConfig, final OutputCodec codec,
75+
public S3SinkService(final S3SinkConfig s3SinkConfig,
7876
final OutputCodecContext codecContext, final S3Client s3Client, final KeyGenerator keyGenerator,
7977
final Duration retrySleepTime, final PluginMetrics pluginMetrics, final S3GroupManager s3GroupManager) {
8078
this.s3SinkConfig = s3SinkConfig;
81-
this.codec = codec;
8279
this.s3Client = s3Client;
8380
this.codecContext = codecContext;
8481
this.keyGenerator = keyGenerator;
@@ -122,6 +119,7 @@ void output(Collection<Record<Event>> records) {
122119
try {
123120
final S3Group s3Group = s3GroupManager.getOrCreateGroupForEvent(event);
124121
final Buffer currentBuffer = s3Group.getBuffer();
122+
final OutputCodec codec = s3Group.getOutputCodec();
125123

126124
if (currentBuffer.getEventCount() == 0) {
127125
codec.start(currentBuffer.getOutputStream(), event, codecContext);
@@ -178,7 +176,7 @@ private boolean flushToS3IfNeeded(final S3Group s3Group, final boolean forceFlus
178176
s3Group.getBuffer().getSize(), s3Group.getBuffer().getEventCount(), s3Group.getBuffer().getDuration());
179177
if (forceFlush || ThresholdCheck.checkThresholdExceed(s3Group.getBuffer(), maxEvents, maxBytes, maxCollectionDuration)) {
180178
try {
181-
codec.complete(s3Group.getBuffer().getOutputStream());
179+
s3Group.getOutputCodec().complete(s3Group.getBuffer().getOutputStream());
182180
String s3Key = s3Group.getBuffer().getKey();
183181
LOG.info("Writing {} to S3 with {} events and size of {} bytes.",
184182
s3Key, s3Group.getBuffer().getEventCount(), s3Group.getBuffer().getSize());
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
package org.opensearch.dataprepper.plugins.sink.s3.codec;
2+
3+
import org.opensearch.dataprepper.model.codec.OutputCodec;
4+
import org.opensearch.dataprepper.model.configuration.PluginModel;
5+
import org.opensearch.dataprepper.model.configuration.PluginSetting;
6+
import org.opensearch.dataprepper.model.plugin.PluginFactory;
7+
8+
public class CodecFactory {
9+
10+
private final PluginFactory pluginFactory;
11+
12+
private final PluginSetting codecPluginSetting;
13+
14+
public CodecFactory(final PluginFactory pluginFactory,
15+
final PluginModel codecConfiguration) {
16+
this.pluginFactory = pluginFactory;
17+
this.codecPluginSetting = new PluginSetting(codecConfiguration.getPluginName(), codecConfiguration.getPluginSettings());
18+
}
19+
20+
public OutputCodec provideCodec() {
21+
return pluginFactory.loadPlugin(OutputCodec.class, codecPluginSetting);
22+
}
23+
}

data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/grouping/S3Group.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
package org.opensearch.dataprepper.plugins.sink.s3.grouping;
77

8+
import org.opensearch.dataprepper.model.codec.OutputCodec;
89
import org.opensearch.dataprepper.model.event.EventHandle;
910
import org.opensearch.dataprepper.plugins.sink.s3.accumulator.Buffer;
1011

@@ -15,21 +16,29 @@ public class S3Group implements Comparable<S3Group> {
1516

1617
private final Buffer buffer;
1718

19+
private OutputCodec outputCodec;
20+
1821
private final S3GroupIdentifier s3GroupIdentifier;
1922

2023
private final Collection<EventHandle> groupEventHandles;
2124

2225
public S3Group(final S3GroupIdentifier s3GroupIdentifier,
23-
final Buffer buffer) {
26+
final Buffer buffer,
27+
final OutputCodec outputCodec) {
2428
this.buffer = buffer;
2529
this.s3GroupIdentifier = s3GroupIdentifier;
30+
this.outputCodec = outputCodec;
2631
this.groupEventHandles = new LinkedList<>();
2732
}
2833

2934
public Buffer getBuffer() {
3035
return buffer;
3136
}
3237

38+
public OutputCodec getOutputCodec() {
39+
return outputCodec;
40+
}
41+
3342
S3GroupIdentifier getS3GroupIdentifier() { return s3GroupIdentifier; }
3443

3544
public void addEventHandle(final EventHandle eventHandle) {

data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/grouping/S3GroupManager.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,12 @@
66
package org.opensearch.dataprepper.plugins.sink.s3.grouping;
77

88
import com.google.common.collect.Maps;
9+
import org.opensearch.dataprepper.model.codec.OutputCodec;
910
import org.opensearch.dataprepper.model.event.Event;
1011
import org.opensearch.dataprepper.plugins.sink.s3.S3SinkConfig;
1112
import org.opensearch.dataprepper.plugins.sink.s3.accumulator.Buffer;
1213
import org.opensearch.dataprepper.plugins.sink.s3.accumulator.BufferFactory;
14+
import org.opensearch.dataprepper.plugins.sink.s3.codec.CodecFactory;
1315
import org.slf4j.Logger;
1416
import org.slf4j.LoggerFactory;
1517
import software.amazon.awssdk.services.s3.S3Client;
@@ -25,6 +27,9 @@ public class S3GroupManager {
2527
private final S3SinkConfig s3SinkConfig;
2628
private final S3GroupIdentifierFactory s3GroupIdentifierFactory;
2729
private final BufferFactory bufferFactory;
30+
31+
private final CodecFactory codecFactory;
32+
2833
private final S3Client s3Client;
2934

3035
private long totalGroupSize;
@@ -33,10 +38,12 @@ public class S3GroupManager {
3338
public S3GroupManager(final S3SinkConfig s3SinkConfig,
3439
final S3GroupIdentifierFactory s3GroupIdentifierFactory,
3540
final BufferFactory bufferFactory,
41+
final CodecFactory codecFactory,
3642
final S3Client s3Client) {
3743
this.s3SinkConfig = s3SinkConfig;
3844
this.s3GroupIdentifierFactory = s3GroupIdentifierFactory;
3945
this.bufferFactory = bufferFactory;
46+
this.codecFactory = codecFactory;
4047
this.s3Client = s3Client;
4148
totalGroupSize = 0;
4249
}
@@ -67,7 +74,8 @@ public S3Group getOrCreateGroupForEvent(final Event event) {
6774
return allGroups.get(s3GroupIdentifier);
6875
} else {
6976
final Buffer bufferForNewGroup = bufferFactory.getBuffer(s3Client, s3SinkConfig::getBucketName, s3GroupIdentifier::getGroupIdentifierFullObjectKey);
70-
final S3Group s3Group = new S3Group(s3GroupIdentifier, bufferForNewGroup);
77+
final OutputCodec outputCodec = codecFactory.provideCodec();
78+
final S3Group s3Group = new S3Group(s3GroupIdentifier, bufferForNewGroup, outputCodec);
7179
allGroups.put(s3GroupIdentifier, s3Group);
7280
LOG.debug("Created a new S3 group. Total number of groups: {}", allGroups.size());
7381
return s3Group;

0 commit comments

Comments
 (0)