Skip to content

Commit 5aee2c5

Browse files
authored
Use EventFactory instead of JacksonEvent.builder() in iceberg-source (#6641)
Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
1 parent 73b809a commit 5aee2c5

8 files changed

Lines changed: 49 additions & 24 deletions

File tree

data-prepper-plugins/iceberg-source/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ dependencies {
4747
implementation 'com.fasterxml.jackson.core:jackson-databind'
4848

4949
testImplementation project(':data-prepper-test:test-common')
50+
testImplementation project(':data-prepper-test:test-event')
5051

5152
integrationTestImplementation project(':data-prepper-plugins:in-memory-source-coordination-store')
5253
integrationTestImplementation project(':data-prepper-core')

data-prepper-plugins/iceberg-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/iceberg/IcebergSourceIT.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -302,7 +302,8 @@ private IcebergService createService(final boolean disableExport) throws Excepti
302302
final EnhancedSourceCoordinator coordinator = createInMemoryCoordinator();
303303
coordinator.createPartition(new LeaderPartition());
304304

305-
return new IcebergService(coordinator, sourceConfig, pluginMetrics, acknowledgementSetManager);
305+
return new IcebergService(coordinator, sourceConfig, pluginMetrics, acknowledgementSetManager,
306+
org.opensearch.dataprepper.event.TestEventFactory.getTestEventFactory());
306307
}
307308

308309
private EnhancedSourceCoordinator createInMemoryCoordinator() {

data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/IcebergService.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
1919
import org.opensearch.dataprepper.model.buffer.Buffer;
2020
import org.opensearch.dataprepper.model.event.Event;
21+
import org.opensearch.dataprepper.model.event.EventFactory;
2122
import org.opensearch.dataprepper.model.record.Record;
2223
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator;
2324
import org.opensearch.dataprepper.plugins.source.iceberg.leader.LeaderScheduler;
@@ -41,16 +42,19 @@ public class IcebergService {
4142
private final IcebergSourceConfig sourceConfig;
4243
private final PluginMetrics pluginMetrics;
4344
private final AcknowledgementSetManager acknowledgementSetManager;
45+
private final EventFactory eventFactory;
4446
private ExecutorService executor;
4547

4648
public IcebergService(final EnhancedSourceCoordinator sourceCoordinator,
4749
final IcebergSourceConfig sourceConfig,
4850
final PluginMetrics pluginMetrics,
49-
final AcknowledgementSetManager acknowledgementSetManager) {
51+
final AcknowledgementSetManager acknowledgementSetManager,
52+
final EventFactory eventFactory) {
5053
this.sourceCoordinator = sourceCoordinator;
5154
this.sourceConfig = sourceConfig;
5255
this.pluginMetrics = pluginMetrics;
5356
this.acknowledgementSetManager = acknowledgementSetManager;
57+
this.eventFactory = eventFactory;
5458
}
5559

5660
public void start(final Buffer<Record<Event>> buffer) {
@@ -98,7 +102,7 @@ public void start(final Buffer<Record<Event>> buffer) {
98102

99103
runnableList.add(new LeaderScheduler(sourceCoordinator, tableConfigs, sourceConfig.getPollingInterval(), tables));
100104
runnableList.add(new ChangelogWorker(
101-
sourceCoordinator, sourceConfig, tables, tableConfigs, buffer, acknowledgementSetManager));
105+
sourceCoordinator, sourceConfig, tables, tableConfigs, buffer, acknowledgementSetManager, eventFactory));
102106

103107
executor = Executors.newFixedThreadPool(runnableList.size());
104108
runnableList.forEach(executor::submit);

data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/IcebergSource.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import org.opensearch.dataprepper.model.annotations.Experimental;
1818
import org.opensearch.dataprepper.model.buffer.Buffer;
1919
import org.opensearch.dataprepper.model.event.Event;
20+
import org.opensearch.dataprepper.model.event.EventFactory;
2021
import org.opensearch.dataprepper.model.record.Record;
2122
import org.opensearch.dataprepper.model.source.Source;
2223
import org.opensearch.dataprepper.model.source.coordinator.SourcePartitionStoreItem;
@@ -40,16 +41,19 @@ public class IcebergSource implements Source<Record<Event>>, UsesEnhancedSourceC
4041
private final IcebergSourceConfig sourceConfig;
4142
private final PluginMetrics pluginMetrics;
4243
private final AcknowledgementSetManager acknowledgementSetManager;
44+
private final EventFactory eventFactory;
4345
private EnhancedSourceCoordinator sourceCoordinator;
4446
private IcebergService icebergService;
4547

4648
@DataPrepperPluginConstructor
4749
public IcebergSource(final IcebergSourceConfig sourceConfig,
4850
final PluginMetrics pluginMetrics,
49-
final AcknowledgementSetManager acknowledgementSetManager) {
51+
final AcknowledgementSetManager acknowledgementSetManager,
52+
final EventFactory eventFactory) {
5053
this.sourceConfig = sourceConfig;
5154
this.pluginMetrics = pluginMetrics;
5255
this.acknowledgementSetManager = acknowledgementSetManager;
56+
this.eventFactory = eventFactory;
5357
LOG.info("Creating Iceberg Source for {} table(s)", sourceConfig.getTables().size());
5458
}
5559

@@ -60,7 +64,7 @@ public void start(final Buffer<Record<Event>> buffer) {
6064

6165
sourceCoordinator.createPartition(new LeaderPartition());
6266

63-
icebergService = new IcebergService(sourceCoordinator, sourceConfig, pluginMetrics, acknowledgementSetManager);
67+
icebergService = new IcebergService(sourceCoordinator, sourceConfig, pluginMetrics, acknowledgementSetManager, eventFactory);
6468
icebergService.start(buffer);
6569
}
6670

data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/worker/ChangelogRecordConverter.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,8 @@
1919
import org.apache.iceberg.variants.VariantValue;
2020
import org.apache.iceberg.variants.PhysicalType;
2121
import org.opensearch.dataprepper.model.event.Event;
22-
import org.opensearch.dataprepper.model.event.JacksonEvent;
22+
import org.opensearch.dataprepper.model.event.EventBuilder;
23+
import org.opensearch.dataprepper.model.event.EventFactory;
2324

2425
import java.math.BigDecimal;
2526
import java.nio.ByteBuffer;
@@ -45,10 +46,12 @@ public class ChangelogRecordConverter {
4546

4647
private final String tableName;
4748
private final List<String> identifierColumns;
49+
private final EventFactory eventFactory;
4850

49-
public ChangelogRecordConverter(final String tableName, final List<String> identifierColumns) {
51+
public ChangelogRecordConverter(final String tableName, final List<String> identifierColumns, final EventFactory eventFactory) {
5052
this.tableName = tableName;
5153
this.identifierColumns = identifierColumns;
54+
this.eventFactory = eventFactory;
5255
}
5356

5457
public Event convert(final Record record,
@@ -61,7 +64,7 @@ public Event convert(final Record record,
6164
data.put(field.name(), convertValue(value, field.type()));
6265
}
6366

64-
final Event event = JacksonEvent.builder()
67+
final Event event = eventFactory.eventBuilder(EventBuilder.class)
6568
.withEventType(EVENT_TYPE)
6669
.withData(data)
6770
.build();

data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/worker/ChangelogWorker.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
2929
import org.opensearch.dataprepper.model.buffer.Buffer;
3030
import org.opensearch.dataprepper.model.event.Event;
31+
import org.opensearch.dataprepper.model.event.EventFactory;
3132
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator;
3233
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition;
3334
import org.opensearch.dataprepper.plugins.source.iceberg.IcebergSourceConfig;
@@ -62,19 +63,22 @@ public class ChangelogWorker implements Runnable {
6263
private final Map<String, TableConfig> tableConfigs;
6364
private final Buffer<org.opensearch.dataprepper.model.record.Record<Event>> buffer;
6465
private final AcknowledgementSetManager acknowledgementSetManager;
66+
private final EventFactory eventFactory;
6567

6668
public ChangelogWorker(final EnhancedSourceCoordinator sourceCoordinator,
6769
final IcebergSourceConfig sourceConfig,
6870
final Map<String, Table> tables,
6971
final Map<String, TableConfig> tableConfigs,
7072
final Buffer<org.opensearch.dataprepper.model.record.Record<Event>> buffer,
71-
final AcknowledgementSetManager acknowledgementSetManager) {
73+
final AcknowledgementSetManager acknowledgementSetManager,
74+
final EventFactory eventFactory) {
7275
this.sourceCoordinator = sourceCoordinator;
7376
this.sourceConfig = sourceConfig;
7477
this.tables = tables;
7578
this.tableConfigs = tableConfigs;
7679
this.buffer = buffer;
7780
this.acknowledgementSetManager = acknowledgementSetManager;
81+
this.eventFactory = eventFactory;
7882
}
7983

8084
@Override
@@ -134,7 +138,7 @@ private void processPartition(final ChangelogTaskPartition partition) throws Exc
134138

135139
final Schema schema = table.schema();
136140
final ChangelogRecordConverter converter = new ChangelogRecordConverter(
137-
tableName, tableConfig.getIdentifierColumns());
141+
tableName, tableConfig.getIdentifierColumns(), eventFactory);
138142
final CarryoverRemover carryoverRemover = new CarryoverRemover();
139143

140144
LOG.info("Processing partition for table {} snapshot {} with {} file(s)",
@@ -283,7 +287,7 @@ private void processInitialLoadPartition(final InitialLoadTaskPartition partitio
283287

284288
final Schema schema = table.schema();
285289
final ChangelogRecordConverter converter = new ChangelogRecordConverter(
286-
tableName, tableConfig.getIdentifierColumns());
290+
tableName, tableConfig.getIdentifierColumns(), eventFactory);
287291

288292
LOG.info("Processing initial load partition for table {} file {}",
289293
tableName, state.getDataFilePath());

data-prepper-plugins/iceberg-source/src/test/java/org/opensearch/dataprepper/plugins/source/iceberg/IcebergSourceTest.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import org.mockito.junit.jupiter.MockitoExtension;
1717
import org.opensearch.dataprepper.metrics.PluginMetrics;
1818
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
19+
import org.opensearch.dataprepper.model.event.EventFactory;
1920
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator;
2021

2122
import java.util.List;
@@ -36,6 +37,9 @@ class IcebergSourceTest {
3637
@Mock
3738
private AcknowledgementSetManager acknowledgementSetManager;
3839

40+
@Mock
41+
private EventFactory eventFactory;
42+
3943
@Mock
4044
private EnhancedSourceCoordinator sourceCoordinator;
4145

@@ -47,7 +51,7 @@ void areAcknowledgementsEnabled_returnsConfigValue_whenTrue() {
4751
when(sourceConfig.isAcknowledgmentsEnabled()).thenReturn(true);
4852
when(sourceConfig.getTables()).thenReturn(List.of(tableConfig));
4953

50-
final IcebergSource source = new IcebergSource(sourceConfig, pluginMetrics, acknowledgementSetManager);
54+
final IcebergSource source = new IcebergSource(sourceConfig, pluginMetrics, acknowledgementSetManager, eventFactory);
5155
assertThat(source.areAcknowledgementsEnabled(), equalTo(true));
5256
}
5357

@@ -56,15 +60,15 @@ void areAcknowledgementsEnabled_returnsConfigValue_whenFalse() {
5660
when(sourceConfig.isAcknowledgmentsEnabled()).thenReturn(false);
5761
when(sourceConfig.getTables()).thenReturn(List.of(tableConfig));
5862

59-
final IcebergSource source = new IcebergSource(sourceConfig, pluginMetrics, acknowledgementSetManager);
63+
final IcebergSource source = new IcebergSource(sourceConfig, pluginMetrics, acknowledgementSetManager, eventFactory);
6064
assertThat(source.areAcknowledgementsEnabled(), equalTo(false));
6165
}
6266

6367
@Test
6468
void getPartitionFactory_returnsNonNull() {
6569
when(sourceConfig.getTables()).thenReturn(List.of(tableConfig));
6670

67-
final IcebergSource source = new IcebergSource(sourceConfig, pluginMetrics, acknowledgementSetManager);
71+
final IcebergSource source = new IcebergSource(sourceConfig, pluginMetrics, acknowledgementSetManager, eventFactory);
6872
assertThat(source.getPartitionFactory() != null, equalTo(true));
6973
}
7074
}

data-prepper-plugins/iceberg-source/src/test/java/org/opensearch/dataprepper/plugins/source/iceberg/worker/ChangelogRecordConverterTest.java

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,9 @@
2020
import org.apache.iceberg.variants.VariantObject;
2121
import org.apache.iceberg.variants.VariantPrimitive;
2222
import org.junit.jupiter.api.Test;
23+
import org.opensearch.dataprepper.event.TestEventFactory;
2324
import org.opensearch.dataprepper.model.event.Event;
25+
import org.opensearch.dataprepper.model.event.EventFactory;
2426

2527
import java.math.BigDecimal;
2628
import java.time.LocalDate;
@@ -36,6 +38,8 @@
3638

3739
class ChangelogRecordConverterTest {
3840

41+
private static final EventFactory EVENT_FACTORY = TestEventFactory.getTestEventFactory();
42+
3943
private static final Schema TEST_SCHEMA = new Schema(
4044
Types.NestedField.required(1, "id", Types.IntegerType.get()),
4145
Types.NestedField.optional(2, "name", Types.StringType.get()),
@@ -44,7 +48,7 @@ class ChangelogRecordConverterTest {
4448

4549
@Test
4650
void convert_insertOperation_setsCorrectMetadata() {
47-
final ChangelogRecordConverter converter = new ChangelogRecordConverter("test_table", List.of("id"));
51+
final ChangelogRecordConverter converter = new ChangelogRecordConverter("test_table", List.of("id"), EVENT_FACTORY);
4852
final Record record = GenericRecord.create(TEST_SCHEMA);
4953
record.setField("id", 1);
5054
record.setField("name", "Alice");
@@ -64,7 +68,7 @@ void convert_insertOperation_setsCorrectMetadata() {
6468

6569
@Test
6670
void convert_deleteOperation_setsDeleteBulkAction() {
67-
final ChangelogRecordConverter converter = new ChangelogRecordConverter("test_table", List.of("id"));
71+
final ChangelogRecordConverter converter = new ChangelogRecordConverter("test_table", List.of("id"), EVENT_FACTORY);
6872
final Record record = GenericRecord.create(TEST_SCHEMA);
6973
record.setField("id", 1);
7074
record.setField("name", "Alice");
@@ -78,7 +82,7 @@ void convert_deleteOperation_setsDeleteBulkAction() {
7882

7983
@Test
8084
void convert_multipleIdentifierColumns_concatenatedWithPipe() {
81-
final ChangelogRecordConverter converter = new ChangelogRecordConverter("test_table", List.of("id", "name"));
85+
final ChangelogRecordConverter converter = new ChangelogRecordConverter("test_table", List.of("id", "name"), EVENT_FACTORY);
8286
final Record record = GenericRecord.create(TEST_SCHEMA);
8387
record.setField("id", 1);
8488
record.setField("name", "Alice");
@@ -91,7 +95,7 @@ void convert_multipleIdentifierColumns_concatenatedWithPipe() {
9195

9296
@Test
9397
void convert_noIdentifierColumns_noDocumentId() {
94-
final ChangelogRecordConverter converter = new ChangelogRecordConverter("test_table", List.of());
98+
final ChangelogRecordConverter converter = new ChangelogRecordConverter("test_table", List.of(), EVENT_FACTORY);
9599
final Record record = GenericRecord.create(TEST_SCHEMA);
96100
record.setField("id", 1);
97101
record.setField("name", "Alice");
@@ -108,7 +112,7 @@ void convert_decimalType_convertedToString() {
108112
Types.NestedField.required(1, "id", Types.IntegerType.get()),
109113
Types.NestedField.optional(2, "price", Types.DecimalType.of(10, 2))
110114
);
111-
final ChangelogRecordConverter converter = new ChangelogRecordConverter("test_table", List.of("id"));
115+
final ChangelogRecordConverter converter = new ChangelogRecordConverter("test_table", List.of("id"), EVENT_FACTORY);
112116
final Record record = GenericRecord.create(schema);
113117
record.setField("id", 1);
114118
record.setField("price", new BigDecimal("123.45"));
@@ -124,7 +128,7 @@ void convert_dateType_convertedToIsoString() {
124128
Types.NestedField.required(1, "id", Types.IntegerType.get()),
125129
Types.NestedField.optional(2, "created", Types.DateType.get())
126130
);
127-
final ChangelogRecordConverter converter = new ChangelogRecordConverter("test_table", List.of("id"));
131+
final ChangelogRecordConverter converter = new ChangelogRecordConverter("test_table", List.of("id"), EVENT_FACTORY);
128132
final Record record = GenericRecord.create(schema);
129133
record.setField("id", 1);
130134
record.setField("created", LocalDate.of(2024, 1, 15));
@@ -136,7 +140,7 @@ void convert_dateType_convertedToIsoString() {
136140

137141
@Test
138142
void convert_nullField_preservedAsNull() {
139-
final ChangelogRecordConverter converter = new ChangelogRecordConverter("test_table", List.of("id"));
143+
final ChangelogRecordConverter converter = new ChangelogRecordConverter("test_table", List.of("id"), EVENT_FACTORY);
140144
final Record record = GenericRecord.create(TEST_SCHEMA);
141145
record.setField("id", 1);
142146
record.setField("name", null);
@@ -165,7 +169,7 @@ void convert_structType_preservesFieldNames() {
165169
record.setField("id", 1);
166170
record.setField("address", addressRecord);
167171

168-
final ChangelogRecordConverter converter = new ChangelogRecordConverter("test_table", List.of("id"));
172+
final ChangelogRecordConverter converter = new ChangelogRecordConverter("test_table", List.of("id"), EVENT_FACTORY);
169173
final Event event = converter.convert(record, schema, "INSERT", 12345L);
170174

171175
@SuppressWarnings("unchecked")
@@ -199,7 +203,7 @@ void convert_variantType_objectConvertedToMap() {
199203
record.setField("id", 1);
200204
record.setField("data", variant);
201205

202-
final ChangelogRecordConverter converter = new ChangelogRecordConverter("test_table", List.of("id"));
206+
final ChangelogRecordConverter converter = new ChangelogRecordConverter("test_table", List.of("id"), EVENT_FACTORY);
203207
final Event event = converter.convert(record, schema, "INSERT", 12345L);
204208

205209
@SuppressWarnings("unchecked")
@@ -237,7 +241,7 @@ void convert_variantType_arrayConvertedToList() {
237241
record.setField("id", 1);
238242
record.setField("tags", variant);
239243

240-
final ChangelogRecordConverter converter = new ChangelogRecordConverter("test_table", List.of("id"));
244+
final ChangelogRecordConverter converter = new ChangelogRecordConverter("test_table", List.of("id"), EVENT_FACTORY);
241245
final Event event = converter.convert(record, schema, "INSERT", 12345L);
242246

243247
@SuppressWarnings("unchecked")

0 commit comments

Comments
 (0)