diff --git a/data-prepper-plugins/iceberg-source/build.gradle b/data-prepper-plugins/iceberg-source/build.gradle index 33a96fd42e..bd59f11b00 100644 --- a/data-prepper-plugins/iceberg-source/build.gradle +++ b/data-prepper-plugins/iceberg-source/build.gradle @@ -47,6 +47,7 @@ dependencies { implementation 'com.fasterxml.jackson.core:jackson-databind' testImplementation project(':data-prepper-test:test-common') + testImplementation project(':data-prepper-test:test-event') integrationTestImplementation project(':data-prepper-plugins:in-memory-source-coordination-store') integrationTestImplementation project(':data-prepper-core') diff --git a/data-prepper-plugins/iceberg-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/iceberg/IcebergSourceIT.java b/data-prepper-plugins/iceberg-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/iceberg/IcebergSourceIT.java index 81c0dfb6b7..ccfb178e67 100644 --- a/data-prepper-plugins/iceberg-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/iceberg/IcebergSourceIT.java +++ b/data-prepper-plugins/iceberg-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/iceberg/IcebergSourceIT.java @@ -302,7 +302,8 @@ private IcebergService createService(final boolean disableExport) throws Excepti final EnhancedSourceCoordinator coordinator = createInMemoryCoordinator(); coordinator.createPartition(new LeaderPartition()); - return new IcebergService(coordinator, sourceConfig, pluginMetrics, acknowledgementSetManager); + return new IcebergService(coordinator, sourceConfig, pluginMetrics, acknowledgementSetManager, + org.opensearch.dataprepper.event.TestEventFactory.getTestEventFactory()); } private EnhancedSourceCoordinator createInMemoryCoordinator() { diff --git a/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/IcebergService.java b/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/IcebergService.java index 968d52eec9..2288cef9fe 100644 --- a/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/IcebergService.java +++ b/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/IcebergService.java @@ -18,6 +18,7 @@ import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; import org.opensearch.dataprepper.model.buffer.Buffer; import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.EventFactory; import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator; import org.opensearch.dataprepper.plugins.source.iceberg.leader.LeaderScheduler; @@ -41,16 +42,19 @@ public class IcebergService { private final IcebergSourceConfig sourceConfig; private final PluginMetrics pluginMetrics; private final AcknowledgementSetManager acknowledgementSetManager; + private final EventFactory eventFactory; private ExecutorService executor; public IcebergService(final EnhancedSourceCoordinator sourceCoordinator, final IcebergSourceConfig sourceConfig, final PluginMetrics pluginMetrics, - final AcknowledgementSetManager acknowledgementSetManager) { + final AcknowledgementSetManager acknowledgementSetManager, + final EventFactory eventFactory) { this.sourceCoordinator = sourceCoordinator; this.sourceConfig = sourceConfig; this.pluginMetrics = pluginMetrics; this.acknowledgementSetManager = acknowledgementSetManager; + this.eventFactory = eventFactory; } public void start(final Buffer> buffer) { @@ -98,7 +102,7 @@ public void start(final Buffer> buffer) { runnableList.add(new LeaderScheduler(sourceCoordinator, tableConfigs, sourceConfig.getPollingInterval(), tables)); runnableList.add(new ChangelogWorker( - sourceCoordinator, sourceConfig, tables, tableConfigs, buffer, acknowledgementSetManager)); + sourceCoordinator, sourceConfig, tables, tableConfigs, buffer, acknowledgementSetManager, eventFactory)); executor = Executors.newFixedThreadPool(runnableList.size()); runnableList.forEach(executor::submit); diff --git a/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/IcebergSource.java b/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/IcebergSource.java index 2f25770503..6338d1f996 100644 --- a/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/IcebergSource.java +++ b/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/IcebergSource.java @@ -17,6 +17,7 @@ import org.opensearch.dataprepper.model.annotations.Experimental; import org.opensearch.dataprepper.model.buffer.Buffer; import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.EventFactory; import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.model.source.Source; import org.opensearch.dataprepper.model.source.coordinator.SourcePartitionStoreItem; @@ -40,16 +41,19 @@ public class IcebergSource implements Source>, UsesEnhancedSourceC private final IcebergSourceConfig sourceConfig; private final PluginMetrics pluginMetrics; private final AcknowledgementSetManager acknowledgementSetManager; + private final EventFactory eventFactory; private EnhancedSourceCoordinator sourceCoordinator; private IcebergService icebergService; @DataPrepperPluginConstructor public IcebergSource(final IcebergSourceConfig sourceConfig, final PluginMetrics pluginMetrics, - final AcknowledgementSetManager acknowledgementSetManager) { + final AcknowledgementSetManager acknowledgementSetManager, + final EventFactory eventFactory) { this.sourceConfig = sourceConfig; this.pluginMetrics = pluginMetrics; this.acknowledgementSetManager = acknowledgementSetManager; + this.eventFactory = eventFactory; LOG.info("Creating Iceberg Source for {} table(s)", sourceConfig.getTables().size()); } @@ -60,7 +64,7 @@ public void start(final Buffer> buffer) { sourceCoordinator.createPartition(new LeaderPartition()); - icebergService = new IcebergService(sourceCoordinator, sourceConfig, pluginMetrics, acknowledgementSetManager); + icebergService = new IcebergService(sourceCoordinator, sourceConfig, pluginMetrics, acknowledgementSetManager, eventFactory); icebergService.start(buffer); } diff --git a/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/worker/ChangelogRecordConverter.java b/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/worker/ChangelogRecordConverter.java index af966f2992..772a3b7bf8 100644 --- a/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/worker/ChangelogRecordConverter.java +++ b/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/worker/ChangelogRecordConverter.java @@ -19,7 +19,8 @@ import org.apache.iceberg.variants.VariantValue; import org.apache.iceberg.variants.PhysicalType; import org.opensearch.dataprepper.model.event.Event; -import org.opensearch.dataprepper.model.event.JacksonEvent; +import org.opensearch.dataprepper.model.event.EventBuilder; +import org.opensearch.dataprepper.model.event.EventFactory; import java.math.BigDecimal; import java.nio.ByteBuffer; @@ -45,10 +46,12 @@ public class ChangelogRecordConverter { private final String tableName; private final List identifierColumns; + private final EventFactory eventFactory; - public ChangelogRecordConverter(final String tableName, final List identifierColumns) { + public ChangelogRecordConverter(final String tableName, final List identifierColumns, final EventFactory eventFactory) { this.tableName = tableName; this.identifierColumns = identifierColumns; + this.eventFactory = eventFactory; } public Event convert(final Record record, @@ -61,7 +64,7 @@ public Event convert(final Record record, data.put(field.name(), convertValue(value, field.type())); } - final Event event = JacksonEvent.builder() + final Event event = eventFactory.eventBuilder(EventBuilder.class) .withEventType(EVENT_TYPE) .withData(data) .build(); diff --git a/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/worker/ChangelogWorker.java b/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/worker/ChangelogWorker.java index 1ec95b58bf..db044fe0c6 100644 --- a/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/worker/ChangelogWorker.java +++ b/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/worker/ChangelogWorker.java @@ -28,6 +28,7 @@ import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; import org.opensearch.dataprepper.model.buffer.Buffer; import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.EventFactory; import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator; import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition; import org.opensearch.dataprepper.plugins.source.iceberg.IcebergSourceConfig; @@ -62,19 +63,22 @@ public class ChangelogWorker implements Runnable { private final Map tableConfigs; private final Buffer> buffer; private final AcknowledgementSetManager acknowledgementSetManager; + private final EventFactory eventFactory; public ChangelogWorker(final EnhancedSourceCoordinator sourceCoordinator, final IcebergSourceConfig sourceConfig, final Map tables, final Map tableConfigs, final Buffer> buffer, - final AcknowledgementSetManager acknowledgementSetManager) { + final AcknowledgementSetManager acknowledgementSetManager, + final EventFactory eventFactory) { this.sourceCoordinator = sourceCoordinator; this.sourceConfig = sourceConfig; this.tables = tables; this.tableConfigs = tableConfigs; this.buffer = buffer; this.acknowledgementSetManager = acknowledgementSetManager; + this.eventFactory = eventFactory; } @Override @@ -134,7 +138,7 @@ private void processPartition(final ChangelogTaskPartition partition) throws Exc final Schema schema = table.schema(); final ChangelogRecordConverter converter = new ChangelogRecordConverter( - tableName, tableConfig.getIdentifierColumns()); + tableName, tableConfig.getIdentifierColumns(), eventFactory); final CarryoverRemover carryoverRemover = new CarryoverRemover(); LOG.info("Processing partition for table {} snapshot {} with {} file(s)", @@ -283,7 +287,7 @@ private void processInitialLoadPartition(final InitialLoadTaskPartition partitio final Schema schema = table.schema(); final ChangelogRecordConverter converter = new ChangelogRecordConverter( - tableName, tableConfig.getIdentifierColumns()); + tableName, tableConfig.getIdentifierColumns(), eventFactory); LOG.info("Processing initial load partition for table {} file {}", tableName, state.getDataFilePath()); diff --git a/data-prepper-plugins/iceberg-source/src/test/java/org/opensearch/dataprepper/plugins/source/iceberg/IcebergSourceTest.java b/data-prepper-plugins/iceberg-source/src/test/java/org/opensearch/dataprepper/plugins/source/iceberg/IcebergSourceTest.java index 75d1f65394..5ec4bacf2b 100644 --- a/data-prepper-plugins/iceberg-source/src/test/java/org/opensearch/dataprepper/plugins/source/iceberg/IcebergSourceTest.java +++ b/data-prepper-plugins/iceberg-source/src/test/java/org/opensearch/dataprepper/plugins/source/iceberg/IcebergSourceTest.java @@ -16,6 +16,7 @@ import org.mockito.junit.jupiter.MockitoExtension; import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; +import org.opensearch.dataprepper.model.event.EventFactory; import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator; import java.util.List; @@ -36,6 +37,9 @@ class IcebergSourceTest { @Mock private AcknowledgementSetManager acknowledgementSetManager; + @Mock + private EventFactory eventFactory; + @Mock private EnhancedSourceCoordinator sourceCoordinator; @@ -47,7 +51,7 @@ void areAcknowledgementsEnabled_returnsConfigValue_whenTrue() { when(sourceConfig.isAcknowledgmentsEnabled()).thenReturn(true); when(sourceConfig.getTables()).thenReturn(List.of(tableConfig)); - final IcebergSource source = new IcebergSource(sourceConfig, pluginMetrics, acknowledgementSetManager); + final IcebergSource source = new IcebergSource(sourceConfig, pluginMetrics, acknowledgementSetManager, eventFactory); assertThat(source.areAcknowledgementsEnabled(), equalTo(true)); } @@ -56,7 +60,7 @@ void areAcknowledgementsEnabled_returnsConfigValue_whenFalse() { when(sourceConfig.isAcknowledgmentsEnabled()).thenReturn(false); when(sourceConfig.getTables()).thenReturn(List.of(tableConfig)); - final IcebergSource source = new IcebergSource(sourceConfig, pluginMetrics, acknowledgementSetManager); + final IcebergSource source = new IcebergSource(sourceConfig, pluginMetrics, acknowledgementSetManager, eventFactory); assertThat(source.areAcknowledgementsEnabled(), equalTo(false)); } @@ -64,7 +68,7 @@ void areAcknowledgementsEnabled_returnsConfigValue_whenFalse() { void getPartitionFactory_returnsNonNull() { when(sourceConfig.getTables()).thenReturn(List.of(tableConfig)); - final IcebergSource source = new IcebergSource(sourceConfig, pluginMetrics, acknowledgementSetManager); + final IcebergSource source = new IcebergSource(sourceConfig, pluginMetrics, acknowledgementSetManager, eventFactory); assertThat(source.getPartitionFactory() != null, equalTo(true)); } } diff --git a/data-prepper-plugins/iceberg-source/src/test/java/org/opensearch/dataprepper/plugins/source/iceberg/worker/ChangelogRecordConverterTest.java b/data-prepper-plugins/iceberg-source/src/test/java/org/opensearch/dataprepper/plugins/source/iceberg/worker/ChangelogRecordConverterTest.java index b5572f695b..9faba2c33b 100644 --- a/data-prepper-plugins/iceberg-source/src/test/java/org/opensearch/dataprepper/plugins/source/iceberg/worker/ChangelogRecordConverterTest.java +++ b/data-prepper-plugins/iceberg-source/src/test/java/org/opensearch/dataprepper/plugins/source/iceberg/worker/ChangelogRecordConverterTest.java @@ -20,7 +20,9 @@ import org.apache.iceberg.variants.VariantObject; import org.apache.iceberg.variants.VariantPrimitive; import org.junit.jupiter.api.Test; +import org.opensearch.dataprepper.event.TestEventFactory; import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.EventFactory; import java.math.BigDecimal; import java.time.LocalDate; @@ -36,6 +38,8 @@ class ChangelogRecordConverterTest { + private static final EventFactory EVENT_FACTORY = TestEventFactory.getTestEventFactory(); + private static final Schema TEST_SCHEMA = new Schema( Types.NestedField.required(1, "id", Types.IntegerType.get()), Types.NestedField.optional(2, "name", Types.StringType.get()), @@ -44,7 +48,7 @@ class ChangelogRecordConverterTest { @Test void convert_insertOperation_setsCorrectMetadata() { - final ChangelogRecordConverter converter = new ChangelogRecordConverter("test_table", List.of("id")); + final ChangelogRecordConverter converter = new ChangelogRecordConverter("test_table", List.of("id"), EVENT_FACTORY); final Record record = GenericRecord.create(TEST_SCHEMA); record.setField("id", 1); record.setField("name", "Alice"); @@ -64,7 +68,7 @@ void convert_insertOperation_setsCorrectMetadata() { @Test void convert_deleteOperation_setsDeleteBulkAction() { - final ChangelogRecordConverter converter = new ChangelogRecordConverter("test_table", List.of("id")); + final ChangelogRecordConverter converter = new ChangelogRecordConverter("test_table", List.of("id"), EVENT_FACTORY); final Record record = GenericRecord.create(TEST_SCHEMA); record.setField("id", 1); record.setField("name", "Alice"); @@ -78,7 +82,7 @@ void convert_deleteOperation_setsDeleteBulkAction() { @Test void convert_multipleIdentifierColumns_concatenatedWithPipe() { - final ChangelogRecordConverter converter = new ChangelogRecordConverter("test_table", List.of("id", "name")); + final ChangelogRecordConverter converter = new ChangelogRecordConverter("test_table", List.of("id", "name"), EVENT_FACTORY); final Record record = GenericRecord.create(TEST_SCHEMA); record.setField("id", 1); record.setField("name", "Alice"); @@ -91,7 +95,7 @@ void convert_multipleIdentifierColumns_concatenatedWithPipe() { @Test void convert_noIdentifierColumns_noDocumentId() { - final ChangelogRecordConverter converter = new ChangelogRecordConverter("test_table", List.of()); + final ChangelogRecordConverter converter = new ChangelogRecordConverter("test_table", List.of(), EVENT_FACTORY); final Record record = GenericRecord.create(TEST_SCHEMA); record.setField("id", 1); record.setField("name", "Alice"); @@ -108,7 +112,7 @@ void convert_decimalType_convertedToString() { Types.NestedField.required(1, "id", Types.IntegerType.get()), Types.NestedField.optional(2, "price", Types.DecimalType.of(10, 2)) ); - final ChangelogRecordConverter converter = new ChangelogRecordConverter("test_table", List.of("id")); + final ChangelogRecordConverter converter = new ChangelogRecordConverter("test_table", List.of("id"), EVENT_FACTORY); final Record record = GenericRecord.create(schema); record.setField("id", 1); record.setField("price", new BigDecimal("123.45")); @@ -124,7 +128,7 @@ void convert_dateType_convertedToIsoString() { Types.NestedField.required(1, "id", Types.IntegerType.get()), Types.NestedField.optional(2, "created", Types.DateType.get()) ); - final ChangelogRecordConverter converter = new ChangelogRecordConverter("test_table", List.of("id")); + final ChangelogRecordConverter converter = new ChangelogRecordConverter("test_table", List.of("id"), EVENT_FACTORY); final Record record = GenericRecord.create(schema); record.setField("id", 1); record.setField("created", LocalDate.of(2024, 1, 15)); @@ -136,7 +140,7 @@ void convert_dateType_convertedToIsoString() { @Test void convert_nullField_preservedAsNull() { - final ChangelogRecordConverter converter = new ChangelogRecordConverter("test_table", List.of("id")); + final ChangelogRecordConverter converter = new ChangelogRecordConverter("test_table", List.of("id"), EVENT_FACTORY); final Record record = GenericRecord.create(TEST_SCHEMA); record.setField("id", 1); record.setField("name", null); @@ -165,7 +169,7 @@ void convert_structType_preservesFieldNames() { record.setField("id", 1); record.setField("address", addressRecord); - final ChangelogRecordConverter converter = new ChangelogRecordConverter("test_table", List.of("id")); + final ChangelogRecordConverter converter = new ChangelogRecordConverter("test_table", List.of("id"), EVENT_FACTORY); final Event event = converter.convert(record, schema, "INSERT", 12345L); @SuppressWarnings("unchecked") @@ -199,7 +203,7 @@ void convert_variantType_objectConvertedToMap() { record.setField("id", 1); record.setField("data", variant); - final ChangelogRecordConverter converter = new ChangelogRecordConverter("test_table", List.of("id")); + final ChangelogRecordConverter converter = new ChangelogRecordConverter("test_table", List.of("id"), EVENT_FACTORY); final Event event = converter.convert(record, schema, "INSERT", 12345L); @SuppressWarnings("unchecked") @@ -237,7 +241,7 @@ void convert_variantType_arrayConvertedToList() { record.setField("id", 1); record.setField("tags", variant); - final ChangelogRecordConverter converter = new ChangelogRecordConverter("test_table", List.of("id")); + final ChangelogRecordConverter converter = new ChangelogRecordConverter("test_table", List.of("id"), EVENT_FACTORY); final Event event = converter.convert(record, schema, "INSERT", 12345L); @SuppressWarnings("unchecked")