Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions data-prepper-plugins/iceberg-source/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ dependencies {
implementation 'com.fasterxml.jackson.core:jackson-databind'

testImplementation project(':data-prepper-test:test-common')
testImplementation project(':data-prepper-test:test-event')

integrationTestImplementation project(':data-prepper-plugins:in-memory-source-coordination-store')
integrationTestImplementation project(':data-prepper-core')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,8 @@ private IcebergService createService(final boolean disableExport) throws Excepti
final EnhancedSourceCoordinator coordinator = createInMemoryCoordinator();
coordinator.createPartition(new LeaderPartition());

return new IcebergService(coordinator, sourceConfig, pluginMetrics, acknowledgementSetManager);
return new IcebergService(coordinator, sourceConfig, pluginMetrics, acknowledgementSetManager,
org.opensearch.dataprepper.event.TestEventFactory.getTestEventFactory());
}

private EnhancedSourceCoordinator createInMemoryCoordinator() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventFactory;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator;
import org.opensearch.dataprepper.plugins.source.iceberg.leader.LeaderScheduler;
Expand All @@ -41,16 +42,19 @@ public class IcebergService {
private final IcebergSourceConfig sourceConfig;
private final PluginMetrics pluginMetrics;
private final AcknowledgementSetManager acknowledgementSetManager;
private final EventFactory eventFactory;
private ExecutorService executor;

public IcebergService(final EnhancedSourceCoordinator sourceCoordinator,
final IcebergSourceConfig sourceConfig,
final PluginMetrics pluginMetrics,
final AcknowledgementSetManager acknowledgementSetManager) {
final AcknowledgementSetManager acknowledgementSetManager,
final EventFactory eventFactory) {
this.sourceCoordinator = sourceCoordinator;
this.sourceConfig = sourceConfig;
this.pluginMetrics = pluginMetrics;
this.acknowledgementSetManager = acknowledgementSetManager;
this.eventFactory = eventFactory;
}

public void start(final Buffer<Record<Event>> buffer) {
Expand Down Expand Up @@ -98,7 +102,7 @@ public void start(final Buffer<Record<Event>> buffer) {

runnableList.add(new LeaderScheduler(sourceCoordinator, tableConfigs, sourceConfig.getPollingInterval(), tables));
runnableList.add(new ChangelogWorker(
sourceCoordinator, sourceConfig, tables, tableConfigs, buffer, acknowledgementSetManager));
sourceCoordinator, sourceConfig, tables, tableConfigs, buffer, acknowledgementSetManager, eventFactory));

executor = Executors.newFixedThreadPool(runnableList.size());
runnableList.forEach(executor::submit);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.opensearch.dataprepper.model.annotations.Experimental;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventFactory;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.source.Source;
import org.opensearch.dataprepper.model.source.coordinator.SourcePartitionStoreItem;
Expand All @@ -40,16 +41,19 @@ public class IcebergSource implements Source<Record<Event>>, UsesEnhancedSourceC
private final IcebergSourceConfig sourceConfig;
private final PluginMetrics pluginMetrics;
private final AcknowledgementSetManager acknowledgementSetManager;
private final EventFactory eventFactory;
private EnhancedSourceCoordinator sourceCoordinator;
private IcebergService icebergService;

@DataPrepperPluginConstructor
public IcebergSource(final IcebergSourceConfig sourceConfig,
final PluginMetrics pluginMetrics,
final AcknowledgementSetManager acknowledgementSetManager) {
final AcknowledgementSetManager acknowledgementSetManager,
final EventFactory eventFactory) {
this.sourceConfig = sourceConfig;
this.pluginMetrics = pluginMetrics;
this.acknowledgementSetManager = acknowledgementSetManager;
this.eventFactory = eventFactory;
LOG.info("Creating Iceberg Source for {} table(s)", sourceConfig.getTables().size());
}

Expand All @@ -60,7 +64,7 @@ public void start(final Buffer<Record<Event>> buffer) {

sourceCoordinator.createPartition(new LeaderPartition());

icebergService = new IcebergService(sourceCoordinator, sourceConfig, pluginMetrics, acknowledgementSetManager);
icebergService = new IcebergService(sourceCoordinator, sourceConfig, pluginMetrics, acknowledgementSetManager, eventFactory);
icebergService.start(buffer);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@
import org.apache.iceberg.variants.VariantValue;
import org.apache.iceberg.variants.PhysicalType;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.event.EventBuilder;
import org.opensearch.dataprepper.model.event.EventFactory;

import java.math.BigDecimal;
import java.nio.ByteBuffer;
Expand All @@ -45,10 +46,12 @@ public class ChangelogRecordConverter {

private final String tableName;
private final List<String> identifierColumns;
private final EventFactory eventFactory;

public ChangelogRecordConverter(final String tableName, final List<String> identifierColumns) {
public ChangelogRecordConverter(final String tableName, final List<String> identifierColumns, final EventFactory eventFactory) {
this.tableName = tableName;
this.identifierColumns = identifierColumns;
this.eventFactory = eventFactory;
}

public Event convert(final Record record,
Expand All @@ -61,7 +64,7 @@ public Event convert(final Record record,
data.put(field.name(), convertValue(value, field.type()));
}

final Event event = JacksonEvent.builder()
final Event event = eventFactory.eventBuilder(EventBuilder.class)
.withEventType(EVENT_TYPE)
.withData(data)
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventFactory;
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator;
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition;
import org.opensearch.dataprepper.plugins.source.iceberg.IcebergSourceConfig;
Expand Down Expand Up @@ -62,19 +63,22 @@ public class ChangelogWorker implements Runnable {
private final Map<String, TableConfig> tableConfigs;
private final Buffer<org.opensearch.dataprepper.model.record.Record<Event>> buffer;
private final AcknowledgementSetManager acknowledgementSetManager;
private final EventFactory eventFactory;

public ChangelogWorker(final EnhancedSourceCoordinator sourceCoordinator,
final IcebergSourceConfig sourceConfig,
final Map<String, Table> tables,
final Map<String, TableConfig> tableConfigs,
final Buffer<org.opensearch.dataprepper.model.record.Record<Event>> buffer,
final AcknowledgementSetManager acknowledgementSetManager) {
final AcknowledgementSetManager acknowledgementSetManager,
final EventFactory eventFactory) {
this.sourceCoordinator = sourceCoordinator;
this.sourceConfig = sourceConfig;
this.tables = tables;
this.tableConfigs = tableConfigs;
this.buffer = buffer;
this.acknowledgementSetManager = acknowledgementSetManager;
this.eventFactory = eventFactory;
}

@Override
Expand Down Expand Up @@ -134,7 +138,7 @@ private void processPartition(final ChangelogTaskPartition partition) throws Exc

final Schema schema = table.schema();
final ChangelogRecordConverter converter = new ChangelogRecordConverter(
tableName, tableConfig.getIdentifierColumns());
tableName, tableConfig.getIdentifierColumns(), eventFactory);
final CarryoverRemover carryoverRemover = new CarryoverRemover();

LOG.info("Processing partition for table {} snapshot {} with {} file(s)",
Expand Down Expand Up @@ -283,7 +287,7 @@ private void processInitialLoadPartition(final InitialLoadTaskPartition partitio

final Schema schema = table.schema();
final ChangelogRecordConverter converter = new ChangelogRecordConverter(
tableName, tableConfig.getIdentifierColumns());
tableName, tableConfig.getIdentifierColumns(), eventFactory);

LOG.info("Processing initial load partition for table {} file {}",
tableName, state.getDataFilePath());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.opensearch.dataprepper.model.event.EventFactory;
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator;

import java.util.List;
Expand All @@ -36,6 +37,9 @@ class IcebergSourceTest {
@Mock
private AcknowledgementSetManager acknowledgementSetManager;

@Mock
private EventFactory eventFactory;

@Mock
private EnhancedSourceCoordinator sourceCoordinator;

Expand All @@ -47,7 +51,7 @@ void areAcknowledgementsEnabled_returnsConfigValue_whenTrue() {
when(sourceConfig.isAcknowledgmentsEnabled()).thenReturn(true);
when(sourceConfig.getTables()).thenReturn(List.of(tableConfig));

final IcebergSource source = new IcebergSource(sourceConfig, pluginMetrics, acknowledgementSetManager);
final IcebergSource source = new IcebergSource(sourceConfig, pluginMetrics, acknowledgementSetManager, eventFactory);
assertThat(source.areAcknowledgementsEnabled(), equalTo(true));
}

Expand All @@ -56,15 +60,15 @@ void areAcknowledgementsEnabled_returnsConfigValue_whenFalse() {
when(sourceConfig.isAcknowledgmentsEnabled()).thenReturn(false);
when(sourceConfig.getTables()).thenReturn(List.of(tableConfig));

final IcebergSource source = new IcebergSource(sourceConfig, pluginMetrics, acknowledgementSetManager);
final IcebergSource source = new IcebergSource(sourceConfig, pluginMetrics, acknowledgementSetManager, eventFactory);
assertThat(source.areAcknowledgementsEnabled(), equalTo(false));
}

@Test
void getPartitionFactory_returnsNonNull() {
when(sourceConfig.getTables()).thenReturn(List.of(tableConfig));

final IcebergSource source = new IcebergSource(sourceConfig, pluginMetrics, acknowledgementSetManager);
final IcebergSource source = new IcebergSource(sourceConfig, pluginMetrics, acknowledgementSetManager, eventFactory);
assertThat(source.getPartitionFactory() != null, equalTo(true));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
import org.apache.iceberg.variants.VariantObject;
import org.apache.iceberg.variants.VariantPrimitive;
import org.junit.jupiter.api.Test;
import org.opensearch.dataprepper.event.TestEventFactory;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventFactory;

import java.math.BigDecimal;
import java.time.LocalDate;
Expand All @@ -36,6 +38,8 @@

class ChangelogRecordConverterTest {

private static final EventFactory EVENT_FACTORY = TestEventFactory.getTestEventFactory();

private static final Schema TEST_SCHEMA = new Schema(
Types.NestedField.required(1, "id", Types.IntegerType.get()),
Types.NestedField.optional(2, "name", Types.StringType.get()),
Expand All @@ -44,7 +48,7 @@ class ChangelogRecordConverterTest {

@Test
void convert_insertOperation_setsCorrectMetadata() {
final ChangelogRecordConverter converter = new ChangelogRecordConverter("test_table", List.of("id"));
final ChangelogRecordConverter converter = new ChangelogRecordConverter("test_table", List.of("id"), EVENT_FACTORY);
final Record record = GenericRecord.create(TEST_SCHEMA);
record.setField("id", 1);
record.setField("name", "Alice");
Expand All @@ -64,7 +68,7 @@ void convert_insertOperation_setsCorrectMetadata() {

@Test
void convert_deleteOperation_setsDeleteBulkAction() {
final ChangelogRecordConverter converter = new ChangelogRecordConverter("test_table", List.of("id"));
final ChangelogRecordConverter converter = new ChangelogRecordConverter("test_table", List.of("id"), EVENT_FACTORY);
final Record record = GenericRecord.create(TEST_SCHEMA);
record.setField("id", 1);
record.setField("name", "Alice");
Expand All @@ -78,7 +82,7 @@ void convert_deleteOperation_setsDeleteBulkAction() {

@Test
void convert_multipleIdentifierColumns_concatenatedWithPipe() {
final ChangelogRecordConverter converter = new ChangelogRecordConverter("test_table", List.of("id", "name"));
final ChangelogRecordConverter converter = new ChangelogRecordConverter("test_table", List.of("id", "name"), EVENT_FACTORY);
final Record record = GenericRecord.create(TEST_SCHEMA);
record.setField("id", 1);
record.setField("name", "Alice");
Expand All @@ -91,7 +95,7 @@ void convert_multipleIdentifierColumns_concatenatedWithPipe() {

@Test
void convert_noIdentifierColumns_noDocumentId() {
final ChangelogRecordConverter converter = new ChangelogRecordConverter("test_table", List.of());
final ChangelogRecordConverter converter = new ChangelogRecordConverter("test_table", List.of(), EVENT_FACTORY);
final Record record = GenericRecord.create(TEST_SCHEMA);
record.setField("id", 1);
record.setField("name", "Alice");
Expand All @@ -108,7 +112,7 @@ void convert_decimalType_convertedToString() {
Types.NestedField.required(1, "id", Types.IntegerType.get()),
Types.NestedField.optional(2, "price", Types.DecimalType.of(10, 2))
);
final ChangelogRecordConverter converter = new ChangelogRecordConverter("test_table", List.of("id"));
final ChangelogRecordConverter converter = new ChangelogRecordConverter("test_table", List.of("id"), EVENT_FACTORY);
final Record record = GenericRecord.create(schema);
record.setField("id", 1);
record.setField("price", new BigDecimal("123.45"));
Expand All @@ -124,7 +128,7 @@ void convert_dateType_convertedToIsoString() {
Types.NestedField.required(1, "id", Types.IntegerType.get()),
Types.NestedField.optional(2, "created", Types.DateType.get())
);
final ChangelogRecordConverter converter = new ChangelogRecordConverter("test_table", List.of("id"));
final ChangelogRecordConverter converter = new ChangelogRecordConverter("test_table", List.of("id"), EVENT_FACTORY);
final Record record = GenericRecord.create(schema);
record.setField("id", 1);
record.setField("created", LocalDate.of(2024, 1, 15));
Expand All @@ -136,7 +140,7 @@ void convert_dateType_convertedToIsoString() {

@Test
void convert_nullField_preservedAsNull() {
final ChangelogRecordConverter converter = new ChangelogRecordConverter("test_table", List.of("id"));
final ChangelogRecordConverter converter = new ChangelogRecordConverter("test_table", List.of("id"), EVENT_FACTORY);
final Record record = GenericRecord.create(TEST_SCHEMA);
record.setField("id", 1);
record.setField("name", null);
Expand Down Expand Up @@ -165,7 +169,7 @@ void convert_structType_preservesFieldNames() {
record.setField("id", 1);
record.setField("address", addressRecord);

final ChangelogRecordConverter converter = new ChangelogRecordConverter("test_table", List.of("id"));
final ChangelogRecordConverter converter = new ChangelogRecordConverter("test_table", List.of("id"), EVENT_FACTORY);
final Event event = converter.convert(record, schema, "INSERT", 12345L);

@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -199,7 +203,7 @@ void convert_variantType_objectConvertedToMap() {
record.setField("id", 1);
record.setField("data", variant);

final ChangelogRecordConverter converter = new ChangelogRecordConverter("test_table", List.of("id"));
final ChangelogRecordConverter converter = new ChangelogRecordConverter("test_table", List.of("id"), EVENT_FACTORY);
final Event event = converter.convert(record, schema, "INSERT", 12345L);

@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -237,7 +241,7 @@ void convert_variantType_arrayConvertedToList() {
record.setField("id", 1);
record.setField("tags", variant);

final ChangelogRecordConverter converter = new ChangelogRecordConverter("test_table", List.of("id"));
final ChangelogRecordConverter converter = new ChangelogRecordConverter("test_table", List.of("id"), EVENT_FACTORY);
final Event event = converter.convert(record, schema, "INSERT", 12345L);

@SuppressWarnings("unchecked")
Expand Down
Loading