diff --git a/data-prepper-plugins/iceberg-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/iceberg/IcebergSourceIT.java b/data-prepper-plugins/iceberg-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/iceberg/IcebergSourceIT.java index ccfb178e67..11c9fcc4fa 100644 --- a/data-prepper-plugins/iceberg-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/iceberg/IcebergSourceIT.java +++ b/data-prepper-plugins/iceberg-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/iceberg/IcebergSourceIT.java @@ -33,6 +33,7 @@ import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator; import org.opensearch.dataprepper.plugins.source.iceberg.coordination.PartitionFactory; import org.opensearch.dataprepper.plugins.source.iceberg.coordination.partition.LeaderPartition; +import org.opensearch.dataprepper.event.TestEventFactory; import org.opensearch.dataprepper.plugins.sourcecoordinator.inmemory.InMemorySourceCoordinationStore; import java.time.Duration; @@ -140,6 +141,74 @@ void export_writes_all_rows_to_buffer() throws Exception { } } + @Test + void export_with_shared_catalog_config() throws Exception { + final Table table = helper.createTable(TEST_NAMESPACE, TEST_TABLE, TEST_SCHEMA); + helper.appendRows(table, List.of( + helper.newRecord(TEST_SCHEMA, 1, "Alice", 30), + helper.newRecord(TEST_SCHEMA, 2, "Bob", 25) + )); + + final IcebergService service = createService(false, true); + final Buffer> buffer = createMockBuffer(); + service.start(buffer); + + try { + await().atMost(30, TimeUnit.SECONDS) + .untilAsserted(() -> assertThat(receivedRecords, hasSize(greaterThanOrEqualTo(2)))); + assertThat(receivedRecords, hasSize(2)); + } finally { + service.shutdown(); + } + } + + @Test + void export_with_mixed_catalog_config() throws Exception { + // Table A uses shared catalog, Table B overrides with its own catalog (same catalog in test, but exercises the override path) + final String tableNameA = "mixed_a"; + final String tableNameB = "mixed_b"; + final Table tableA = helper.createTable(TEST_NAMESPACE, tableNameA, TEST_SCHEMA); + final Table tableB = helper.createTable(TEST_NAMESPACE, tableNameB, TEST_SCHEMA); + helper.appendRows(tableA, List.of(helper.newRecord(TEST_SCHEMA, 1, "Alice", 30))); + helper.appendRows(tableB, List.of(helper.newRecord(TEST_SCHEMA, 2, "Bob", 25))); + + final IcebergSourceConfig sourceConfig = mock(IcebergSourceConfig.class); + final TableConfig configA = mock(TableConfig.class); + final TableConfig configB = mock(TableConfig.class); + + when(configA.getTableName()).thenReturn(TEST_NAMESPACE + "." + tableNameA); + when(configA.getCatalog()).thenReturn(null); + when(configA.getIdentifierColumns()).thenReturn(List.of("id")); + when(configA.isDisableExport()).thenReturn(false); + + when(configB.getTableName()).thenReturn(TEST_NAMESPACE + "." + tableNameB); + when(configB.getCatalog()).thenReturn(helper.catalogProperties()); + when(configB.getIdentifierColumns()).thenReturn(List.of("id")); + when(configB.isDisableExport()).thenReturn(false); + + when(sourceConfig.getCatalog()).thenReturn(helper.catalogProperties()); + when(sourceConfig.getTables()).thenReturn(List.of(configA, configB)); + when(sourceConfig.getPollingInterval()).thenReturn(Duration.ofSeconds(5)); + when(sourceConfig.isAcknowledgmentsEnabled()).thenReturn(false); + + final EnhancedSourceCoordinator coordinator = createInMemoryCoordinator(); + coordinator.createPartition(new LeaderPartition()); + final IcebergService service = new IcebergService(coordinator, sourceConfig, pluginMetrics, + acknowledgementSetManager, TestEventFactory.getTestEventFactory()); + final Buffer> buffer = createMockBuffer(); + service.start(buffer); + + try { + await().atMost(30, TimeUnit.SECONDS) + .untilAsserted(() -> assertThat(receivedRecords, hasSize(greaterThanOrEqualTo(2)))); + assertThat(receivedRecords, hasSize(2)); + } finally { + service.shutdown(); + helper.dropTable(TEST_NAMESPACE, tableNameA); + helper.dropTable(TEST_NAMESPACE, tableNameB); + } + } + @Test void cdc_insert_produces_insert_events() throws Exception { final CdcTestFixture fixture = createCdcTestFixture(); @@ -284,6 +353,10 @@ private Buffer> createMock } private IcebergService createService(final boolean disableExport) throws Exception { + return createService(disableExport, false); + } + + private IcebergService createService(final boolean disableExport, final boolean useSharedCatalog) throws Exception { final String fullTableName = TEST_NAMESPACE + "." + TEST_TABLE; // Build config via reflection since fields are private @@ -291,10 +364,17 @@ private IcebergService createService(final boolean disableExport) throws Excepti final TableConfig tableConfig = mock(TableConfig.class); when(tableConfig.getTableName()).thenReturn(fullTableName); - when(tableConfig.getCatalog()).thenReturn(helper.catalogProperties()); when(tableConfig.getIdentifierColumns()).thenReturn(List.of("id")); when(tableConfig.isDisableExport()).thenReturn(disableExport); + if (useSharedCatalog) { + when(tableConfig.getCatalog()).thenReturn(null); + when(sourceConfig.getCatalog()).thenReturn(helper.catalogProperties()); + } else { + when(tableConfig.getCatalog()).thenReturn(helper.catalogProperties()); + when(sourceConfig.getCatalog()).thenReturn(null); + } + when(sourceConfig.getTables()).thenReturn(List.of(tableConfig)); when(sourceConfig.getPollingInterval()).thenReturn(Duration.ofSeconds(5)); lenient().when(sourceConfig.isAcknowledgmentsEnabled()).thenReturn(false); @@ -303,7 +383,7 @@ private IcebergService createService(final boolean disableExport) throws Excepti coordinator.createPartition(new LeaderPartition()); return new IcebergService(coordinator, sourceConfig, pluginMetrics, acknowledgementSetManager, - org.opensearch.dataprepper.event.TestEventFactory.getTestEventFactory()); + TestEventFactory.getTestEventFactory()); } private EnhancedSourceCoordinator createInMemoryCoordinator() { diff --git a/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/IcebergService.java b/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/IcebergService.java index 2288cef9fe..088d8e7dfc 100644 --- a/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/IcebergService.java +++ b/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/IcebergService.java @@ -68,7 +68,8 @@ public void start(final Buffer> buffer) { final String tableName = tableConfig.getTableName(); LOG.info("Loading catalog and table for {}", tableName); - final Map catalogProps = new HashMap<>(tableConfig.getCatalog()); + final Map catalogProps = new HashMap<>( + tableConfig.getCatalog() != null ? tableConfig.getCatalog() : sourceConfig.getCatalog()); final Catalog catalog = CatalogUtil.buildIcebergCatalog(tableName, catalogProps, null); final TableIdentifier tableId = TableIdentifier.parse(tableName); diff --git a/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/IcebergSourceConfig.java b/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/IcebergSourceConfig.java index 0dac830aed..44ec92f7f9 100644 --- a/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/IcebergSourceConfig.java +++ b/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/IcebergSourceConfig.java @@ -15,12 +15,17 @@ import jakarta.validation.constraints.NotEmpty; import java.time.Duration; +import java.util.Collections; import java.util.List; +import java.util.Map; public class IcebergSourceConfig { static final Duration DEFAULT_POLLING_INTERVAL = Duration.ofSeconds(30); + @JsonProperty("catalog") + private Map catalog = Collections.emptyMap(); + @JsonProperty("tables") @NotEmpty @Valid @@ -43,4 +48,8 @@ public Duration getPollingInterval() { public boolean isAcknowledgmentsEnabled() { return acknowledgments; } + + public Map getCatalog() { + return catalog; + } } diff --git a/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/TableConfig.java b/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/TableConfig.java index 5a3873f32d..1162ff47b0 100644 --- a/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/TableConfig.java +++ b/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/TableConfig.java @@ -24,7 +24,7 @@ public class TableConfig { private String tableName; @JsonProperty("catalog") - private Map catalog = Collections.emptyMap(); + private Map catalog; @JsonProperty("identifier_columns") private List identifierColumns = Collections.emptyList(); diff --git a/data-prepper-plugins/iceberg-source/src/test/java/org/opensearch/dataprepper/plugins/source/iceberg/IcebergServiceTest.java b/data-prepper-plugins/iceberg-source/src/test/java/org/opensearch/dataprepper/plugins/source/iceberg/IcebergServiceTest.java new file mode 100644 index 0000000000..3a89c7405e --- /dev/null +++ b/data-prepper-plugins/iceberg-source/src/test/java/org/opensearch/dataprepper/plugins/source/iceberg/IcebergServiceTest.java @@ -0,0 +1,166 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + +package org.opensearch.dataprepper.plugins.source.iceberg; + +import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.TableIdentifier; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.MockedStatic; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; +import org.opensearch.dataprepper.model.buffer.Buffer; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.EventFactory; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator; + +import java.time.Duration; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.isNull; +import static org.mockito.Mockito.lenient; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.mockStatic; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +class IcebergServiceTest { + + @Mock + private EnhancedSourceCoordinator sourceCoordinator; + @Mock + private IcebergSourceConfig sourceConfig; + @Mock + private PluginMetrics pluginMetrics; + @Mock + private AcknowledgementSetManager acknowledgementSetManager; + @Mock + private EventFactory eventFactory; + @Mock + private Buffer> buffer; + + @Test + @SuppressWarnings("unchecked") + void start_uses_shared_catalog_when_table_catalog_is_null() { + final Map sharedCatalog = Map.of("type", "rest", "uri", "http://shared:8181"); + final TableConfig tableConfig = createTableConfig("db.my_table", null); + + when(sourceConfig.getCatalog()).thenReturn(sharedCatalog); + when(sourceConfig.getTables()).thenReturn(List.of(tableConfig)); + when(sourceConfig.getPollingInterval()).thenReturn(Duration.ofSeconds(5)); + lenient().when(sourceConfig.isAcknowledgmentsEnabled()).thenReturn(false); + + final Catalog catalog = mock(Catalog.class); + final Table table = mock(Table.class); + when(table.properties()).thenReturn(Collections.emptyMap()); + when(catalog.loadTable(any(TableIdentifier.class))).thenReturn(table); + + final ArgumentCaptor> propsCaptor = ArgumentCaptor.forClass(Map.class); + + try (MockedStatic mockedCatalogUtil = mockStatic(CatalogUtil.class)) { + mockedCatalogUtil.when(() -> CatalogUtil.buildIcebergCatalog(anyString(), propsCaptor.capture(), isNull())) + .thenReturn(catalog); + + final IcebergService service = new IcebergService(sourceCoordinator, sourceConfig, + pluginMetrics, acknowledgementSetManager, eventFactory); + service.start(buffer); + service.shutdown(); + + assertThat(propsCaptor.getValue(), equalTo(sharedCatalog)); + } + } + + @Test + @SuppressWarnings("unchecked") + void start_uses_table_catalog_when_table_catalog_is_set() { + final Map tableCatalog = Map.of("type", "glue", "warehouse", "s3://other/"); + final TableConfig tableConfig = createTableConfig("db.my_table", tableCatalog); + + when(sourceConfig.getTables()).thenReturn(List.of(tableConfig)); + when(sourceConfig.getPollingInterval()).thenReturn(Duration.ofSeconds(5)); + lenient().when(sourceConfig.isAcknowledgmentsEnabled()).thenReturn(false); + + final Catalog catalog = mock(Catalog.class); + final Table table = mock(Table.class); + when(table.properties()).thenReturn(Collections.emptyMap()); + when(catalog.loadTable(any(TableIdentifier.class))).thenReturn(table); + + final ArgumentCaptor> propsCaptor = ArgumentCaptor.forClass(Map.class); + + try (MockedStatic mockedCatalogUtil = mockStatic(CatalogUtil.class)) { + mockedCatalogUtil.when(() -> CatalogUtil.buildIcebergCatalog(anyString(), propsCaptor.capture(), isNull())) + .thenReturn(catalog); + + final IcebergService service = new IcebergService(sourceCoordinator, sourceConfig, + pluginMetrics, acknowledgementSetManager, eventFactory); + service.start(buffer); + service.shutdown(); + + assertThat(propsCaptor.getValue(), equalTo(tableCatalog)); + } + } + + @Test + @SuppressWarnings("unchecked") + void start_uses_correct_catalog_for_each_table_in_mixed_config() { + final Map sharedCatalog = Map.of("type", "rest", "uri", "http://shared:8181"); + final Map tableBCatalog = Map.of("type", "glue", "warehouse", "s3://other/"); + final TableConfig configA = createTableConfig("db.table_a", null); + final TableConfig configB = createTableConfig("db.table_b", tableBCatalog); + + when(sourceConfig.getCatalog()).thenReturn(sharedCatalog); + when(sourceConfig.getTables()).thenReturn(List.of(configA, configB)); + when(sourceConfig.getPollingInterval()).thenReturn(Duration.ofSeconds(5)); + lenient().when(sourceConfig.isAcknowledgmentsEnabled()).thenReturn(false); + + final Catalog catalog = mock(Catalog.class); + final Table table = mock(Table.class); + when(table.properties()).thenReturn(Collections.emptyMap()); + when(catalog.loadTable(any(TableIdentifier.class))).thenReturn(table); + + final ArgumentCaptor> propsCaptor = ArgumentCaptor.forClass(Map.class); + + try (MockedStatic mockedCatalogUtil = mockStatic(CatalogUtil.class)) { + mockedCatalogUtil.when(() -> CatalogUtil.buildIcebergCatalog(anyString(), propsCaptor.capture(), isNull())) + .thenReturn(catalog); + + final IcebergService service = new IcebergService(sourceCoordinator, sourceConfig, + pluginMetrics, acknowledgementSetManager, eventFactory); + service.start(buffer); + service.shutdown(); + + final List> captured = propsCaptor.getAllValues(); + assertThat(captured.get(0), equalTo(sharedCatalog)); + assertThat(captured.get(1), equalTo(tableBCatalog)); + } + } + + private TableConfig createTableConfig(final String tableName, final Map catalogProps) { + final TableConfig config = mock(TableConfig.class); + when(config.getTableName()).thenReturn(tableName); + when(config.getCatalog()).thenReturn(catalogProps); + when(config.getIdentifierColumns()).thenReturn(Collections.emptyList()); + lenient().when(config.isDisableExport()).thenReturn(false); + return config; + } +} diff --git a/data-prepper-plugins/iceberg-source/src/test/java/org/opensearch/dataprepper/plugins/source/iceberg/IcebergSourceConfigTest.java b/data-prepper-plugins/iceberg-source/src/test/java/org/opensearch/dataprepper/plugins/source/iceberg/IcebergSourceConfigTest.java new file mode 100644 index 0000000000..d0276b132c --- /dev/null +++ b/data-prepper-plugins/iceberg-source/src/test/java/org/opensearch/dataprepper/plugins/source/iceberg/IcebergSourceConfigTest.java @@ -0,0 +1,40 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + +package org.opensearch.dataprepper.plugins.source.iceberg; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.junit.jupiter.api.Test; + +import java.util.Map; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; + +class IcebergSourceConfigTest { + + private static final ObjectMapper MAPPER = new ObjectMapper(); + + @Test + void default_catalog_is_empty() throws Exception { + final IcebergSourceConfig config = MAPPER.readValue( + "{\"tables\": [{\"table_name\": \"db.t\"}]}", IcebergSourceConfig.class); + assertThat(config.getCatalog().isEmpty(), is(true)); + } + + @Test + void top_level_catalog_is_deserialized() throws Exception { + final String json = "{\"catalog\": {\"type\": \"rest\", \"uri\": \"http://localhost:8181\"}, " + + "\"tables\": [{\"table_name\": \"db.t\"}]}"; + final IcebergSourceConfig config = MAPPER.readValue(json, IcebergSourceConfig.class); + assertThat(config.getCatalog(), equalTo(Map.of("type", "rest", "uri", "http://localhost:8181"))); + } +}