From 11fd38bf0a5a72064cef5273c81a16bdb42f5315 Mon Sep 17 00:00:00 2001 From: Sotaro Hikita Date: Sat, 4 Apr 2026 09:42:10 +0900 Subject: [PATCH 1/2] Support shared catalog config across tables in iceberg-source Signed-off-by: Sotaro Hikita --- .../source/iceberg/IcebergSourceIT.java | 81 ++++++++++++++++++- .../source/iceberg/IcebergService.java | 3 +- .../source/iceberg/IcebergSourceConfig.java | 9 +++ .../iceberg/IcebergSourceConfigTest.java | 40 +++++++++ 4 files changed, 131 insertions(+), 2 deletions(-) create mode 100644 data-prepper-plugins/iceberg-source/src/test/java/org/opensearch/dataprepper/plugins/source/iceberg/IcebergSourceConfigTest.java diff --git a/data-prepper-plugins/iceberg-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/iceberg/IcebergSourceIT.java b/data-prepper-plugins/iceberg-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/iceberg/IcebergSourceIT.java index ccfb178e67..0b780b5d0c 100644 --- a/data-prepper-plugins/iceberg-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/iceberg/IcebergSourceIT.java +++ b/data-prepper-plugins/iceberg-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/iceberg/IcebergSourceIT.java @@ -140,6 +140,74 @@ void export_writes_all_rows_to_buffer() throws Exception { } } + @Test + void export_with_shared_catalog_config() throws Exception { + final Table table = helper.createTable(TEST_NAMESPACE, TEST_TABLE, TEST_SCHEMA); + helper.appendRows(table, List.of( + helper.newRecord(TEST_SCHEMA, 1, "Alice", 30), + helper.newRecord(TEST_SCHEMA, 2, "Bob", 25) + )); + + final IcebergService service = createService(false, true); + final Buffer> buffer = createMockBuffer(); + service.start(buffer); + + try { + await().atMost(30, TimeUnit.SECONDS) + .untilAsserted(() -> assertThat(receivedRecords, hasSize(greaterThanOrEqualTo(2)))); + assertThat(receivedRecords, hasSize(2)); + } finally { + service.shutdown(); + } + } + + @Test + void export_with_mixed_catalog_config() throws Exception { + // Table A uses shared catalog, Table B overrides with its own catalog (same catalog in test, but exercises the override path) + final String tableNameA = "mixed_a"; + final String tableNameB = "mixed_b"; + final Table tableA = helper.createTable(TEST_NAMESPACE, tableNameA, TEST_SCHEMA); + final Table tableB = helper.createTable(TEST_NAMESPACE, tableNameB, TEST_SCHEMA); + helper.appendRows(tableA, List.of(helper.newRecord(TEST_SCHEMA, 1, "Alice", 30))); + helper.appendRows(tableB, List.of(helper.newRecord(TEST_SCHEMA, 2, "Bob", 25))); + + final IcebergSourceConfig sourceConfig = mock(IcebergSourceConfig.class); + final TableConfig configA = mock(TableConfig.class); + final TableConfig configB = mock(TableConfig.class); + + when(configA.getTableName()).thenReturn(TEST_NAMESPACE + "." + tableNameA); + when(configA.getCatalog()).thenReturn(Collections.emptyMap()); + when(configA.getIdentifierColumns()).thenReturn(List.of("id")); + when(configA.isDisableExport()).thenReturn(false); + + when(configB.getTableName()).thenReturn(TEST_NAMESPACE + "." + tableNameB); + when(configB.getCatalog()).thenReturn(helper.catalogProperties()); + when(configB.getIdentifierColumns()).thenReturn(List.of("id")); + when(configB.isDisableExport()).thenReturn(false); + + when(sourceConfig.getCatalog()).thenReturn(helper.catalogProperties()); + when(sourceConfig.getTables()).thenReturn(List.of(configA, configB)); + when(sourceConfig.getPollingInterval()).thenReturn(Duration.ofSeconds(5)); + when(sourceConfig.isAcknowledgmentsEnabled()).thenReturn(false); + + final EnhancedSourceCoordinator coordinator = createInMemoryCoordinator(); + coordinator.createPartition(new LeaderPartition()); + final IcebergService service = new IcebergService(coordinator, sourceConfig, pluginMetrics, + acknowledgementSetManager, org.opensearch.dataprepper.event.TestEventFactory.getTestEventFactory()); + final Buffer> buffer = createMockBuffer(); + service.start(buffer); + + try { + await().atMost(30, TimeUnit.SECONDS) + .untilAsserted(() -> assertThat(receivedRecords, hasSize(greaterThanOrEqualTo(2)))); + assertThat(receivedRecords, hasSize(2)); + } finally { + service.shutdown(); + helper.dropTable(TEST_NAMESPACE, tableNameA); + helper.dropTable(TEST_NAMESPACE, tableNameB); + } + } + @Test void cdc_insert_produces_insert_events() throws Exception { final CdcTestFixture fixture = createCdcTestFixture(); @@ -284,6 +352,10 @@ private Buffer> createMock } private IcebergService createService(final boolean disableExport) throws Exception { + return createService(disableExport, false); + } + + private IcebergService createService(final boolean disableExport, final boolean useSharedCatalog) throws Exception { final String fullTableName = TEST_NAMESPACE + "." + TEST_TABLE; // Build config via reflection since fields are private @@ -291,10 +363,17 @@ private IcebergService createService(final boolean disableExport) throws Excepti final TableConfig tableConfig = mock(TableConfig.class); when(tableConfig.getTableName()).thenReturn(fullTableName); - when(tableConfig.getCatalog()).thenReturn(helper.catalogProperties()); when(tableConfig.getIdentifierColumns()).thenReturn(List.of("id")); when(tableConfig.isDisableExport()).thenReturn(disableExport); + if (useSharedCatalog) { + when(tableConfig.getCatalog()).thenReturn(Collections.emptyMap()); + when(sourceConfig.getCatalog()).thenReturn(helper.catalogProperties()); + } else { + when(tableConfig.getCatalog()).thenReturn(helper.catalogProperties()); + when(sourceConfig.getCatalog()).thenReturn(Collections.emptyMap()); + } + when(sourceConfig.getTables()).thenReturn(List.of(tableConfig)); when(sourceConfig.getPollingInterval()).thenReturn(Duration.ofSeconds(5)); lenient().when(sourceConfig.isAcknowledgmentsEnabled()).thenReturn(false); diff --git a/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/IcebergService.java b/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/IcebergService.java index 2288cef9fe..0cdf940ee9 100644 --- a/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/IcebergService.java +++ b/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/IcebergService.java @@ -68,7 +68,8 @@ public void start(final Buffer> buffer) { final String tableName = tableConfig.getTableName(); LOG.info("Loading catalog and table for {}", tableName); - final Map catalogProps = new HashMap<>(tableConfig.getCatalog()); + final Map catalogProps = new HashMap<>( + tableConfig.getCatalog().isEmpty() ? sourceConfig.getCatalog() : tableConfig.getCatalog()); final Catalog catalog = CatalogUtil.buildIcebergCatalog(tableName, catalogProps, null); final TableIdentifier tableId = TableIdentifier.parse(tableName); diff --git a/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/IcebergSourceConfig.java b/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/IcebergSourceConfig.java index 0dac830aed..44ec92f7f9 100644 --- a/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/IcebergSourceConfig.java +++ b/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/IcebergSourceConfig.java @@ -15,12 +15,17 @@ import jakarta.validation.constraints.NotEmpty; import java.time.Duration; +import java.util.Collections; import java.util.List; +import java.util.Map; public class IcebergSourceConfig { static final Duration DEFAULT_POLLING_INTERVAL = Duration.ofSeconds(30); + @JsonProperty("catalog") + private Map catalog = Collections.emptyMap(); + @JsonProperty("tables") @NotEmpty @Valid @@ -43,4 +48,8 @@ public Duration getPollingInterval() { public boolean isAcknowledgmentsEnabled() { return acknowledgments; } + + public Map getCatalog() { + return catalog; + } } diff --git a/data-prepper-plugins/iceberg-source/src/test/java/org/opensearch/dataprepper/plugins/source/iceberg/IcebergSourceConfigTest.java b/data-prepper-plugins/iceberg-source/src/test/java/org/opensearch/dataprepper/plugins/source/iceberg/IcebergSourceConfigTest.java new file mode 100644 index 0000000000..d0276b132c --- /dev/null +++ b/data-prepper-plugins/iceberg-source/src/test/java/org/opensearch/dataprepper/plugins/source/iceberg/IcebergSourceConfigTest.java @@ -0,0 +1,40 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + +package org.opensearch.dataprepper.plugins.source.iceberg; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.junit.jupiter.api.Test; + +import java.util.Map; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; + +class IcebergSourceConfigTest { + + private static final ObjectMapper MAPPER = new ObjectMapper(); + + @Test + void default_catalog_is_empty() throws Exception { + final IcebergSourceConfig config = MAPPER.readValue( + "{\"tables\": [{\"table_name\": \"db.t\"}]}", IcebergSourceConfig.class); + assertThat(config.getCatalog().isEmpty(), is(true)); + } + + @Test + void top_level_catalog_is_deserialized() throws Exception { + final String json = "{\"catalog\": {\"type\": \"rest\", \"uri\": \"http://localhost:8181\"}, " + + "\"tables\": [{\"table_name\": \"db.t\"}]}"; + final IcebergSourceConfig config = MAPPER.readValue(json, IcebergSourceConfig.class); + assertThat(config.getCatalog(), equalTo(Map.of("type", "rest", "uri", "http://localhost:8181"))); + } +} From 78d6e9c389f59fb66a4d24c46b795df3d03fa5b0 Mon Sep 17 00:00:00 2001 From: Sotaro Hikita Date: Fri, 10 Apr 2026 10:42:45 +0900 Subject: [PATCH 2/2] Address review feedback for shared catalog config Change catalog fallback check from isEmpty() to null check so that an explicitly empty catalog is not silently treated as unset. TableConfig.catalog default is now null instead of emptyMap. Add IcebergServiceTest to verify catalog selection logic: shared only, table override, and mixed configurations. Replace fully qualified TestEventFactory references with imports in IcebergSourceIT. Signed-off-by: Sotaro Hikita --- .../source/iceberg/IcebergSourceIT.java | 11 +- .../source/iceberg/IcebergService.java | 2 +- .../plugins/source/iceberg/TableConfig.java | 2 +- .../source/iceberg/IcebergServiceTest.java | 166 ++++++++++++++++++ 4 files changed, 174 insertions(+), 7 deletions(-) create mode 100644 data-prepper-plugins/iceberg-source/src/test/java/org/opensearch/dataprepper/plugins/source/iceberg/IcebergServiceTest.java diff --git a/data-prepper-plugins/iceberg-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/iceberg/IcebergSourceIT.java b/data-prepper-plugins/iceberg-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/iceberg/IcebergSourceIT.java index 0b780b5d0c..11c9fcc4fa 100644 --- a/data-prepper-plugins/iceberg-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/iceberg/IcebergSourceIT.java +++ b/data-prepper-plugins/iceberg-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/iceberg/IcebergSourceIT.java @@ -33,6 +33,7 @@ import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator; import org.opensearch.dataprepper.plugins.source.iceberg.coordination.PartitionFactory; import org.opensearch.dataprepper.plugins.source.iceberg.coordination.partition.LeaderPartition; +import org.opensearch.dataprepper.event.TestEventFactory; import org.opensearch.dataprepper.plugins.sourcecoordinator.inmemory.InMemorySourceCoordinationStore; import java.time.Duration; @@ -176,7 +177,7 @@ void export_with_mixed_catalog_config() throws Exception { final TableConfig configB = mock(TableConfig.class); when(configA.getTableName()).thenReturn(TEST_NAMESPACE + "." + tableNameA); - when(configA.getCatalog()).thenReturn(Collections.emptyMap()); + when(configA.getCatalog()).thenReturn(null); when(configA.getIdentifierColumns()).thenReturn(List.of("id")); when(configA.isDisableExport()).thenReturn(false); @@ -193,7 +194,7 @@ void export_with_mixed_catalog_config() throws Exception { final EnhancedSourceCoordinator coordinator = createInMemoryCoordinator(); coordinator.createPartition(new LeaderPartition()); final IcebergService service = new IcebergService(coordinator, sourceConfig, pluginMetrics, - acknowledgementSetManager, org.opensearch.dataprepper.event.TestEventFactory.getTestEventFactory()); + acknowledgementSetManager, TestEventFactory.getTestEventFactory()); final Buffer> buffer = createMockBuffer(); service.start(buffer); @@ -367,11 +368,11 @@ private IcebergService createService(final boolean disableExport, final boolean when(tableConfig.isDisableExport()).thenReturn(disableExport); if (useSharedCatalog) { - when(tableConfig.getCatalog()).thenReturn(Collections.emptyMap()); + when(tableConfig.getCatalog()).thenReturn(null); when(sourceConfig.getCatalog()).thenReturn(helper.catalogProperties()); } else { when(tableConfig.getCatalog()).thenReturn(helper.catalogProperties()); - when(sourceConfig.getCatalog()).thenReturn(Collections.emptyMap()); + when(sourceConfig.getCatalog()).thenReturn(null); } when(sourceConfig.getTables()).thenReturn(List.of(tableConfig)); @@ -382,7 +383,7 @@ private IcebergService createService(final boolean disableExport, final boolean coordinator.createPartition(new LeaderPartition()); return new IcebergService(coordinator, sourceConfig, pluginMetrics, acknowledgementSetManager, - org.opensearch.dataprepper.event.TestEventFactory.getTestEventFactory()); + TestEventFactory.getTestEventFactory()); } private EnhancedSourceCoordinator createInMemoryCoordinator() { diff --git a/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/IcebergService.java b/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/IcebergService.java index 0cdf940ee9..088d8e7dfc 100644 --- a/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/IcebergService.java +++ b/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/IcebergService.java @@ -69,7 +69,7 @@ public void start(final Buffer> buffer) { LOG.info("Loading catalog and table for {}", tableName); final Map catalogProps = new HashMap<>( - tableConfig.getCatalog().isEmpty() ? sourceConfig.getCatalog() : tableConfig.getCatalog()); + tableConfig.getCatalog() != null ? tableConfig.getCatalog() : sourceConfig.getCatalog()); final Catalog catalog = CatalogUtil.buildIcebergCatalog(tableName, catalogProps, null); final TableIdentifier tableId = TableIdentifier.parse(tableName); diff --git a/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/TableConfig.java b/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/TableConfig.java index 5a3873f32d..1162ff47b0 100644 --- a/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/TableConfig.java +++ b/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/TableConfig.java @@ -24,7 +24,7 @@ public class TableConfig { private String tableName; @JsonProperty("catalog") - private Map catalog = Collections.emptyMap(); + private Map catalog; @JsonProperty("identifier_columns") private List identifierColumns = Collections.emptyList(); diff --git a/data-prepper-plugins/iceberg-source/src/test/java/org/opensearch/dataprepper/plugins/source/iceberg/IcebergServiceTest.java b/data-prepper-plugins/iceberg-source/src/test/java/org/opensearch/dataprepper/plugins/source/iceberg/IcebergServiceTest.java new file mode 100644 index 0000000000..3a89c7405e --- /dev/null +++ b/data-prepper-plugins/iceberg-source/src/test/java/org/opensearch/dataprepper/plugins/source/iceberg/IcebergServiceTest.java @@ -0,0 +1,166 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + +package org.opensearch.dataprepper.plugins.source.iceberg; + +import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.TableIdentifier; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.MockedStatic; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; +import org.opensearch.dataprepper.model.buffer.Buffer; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.EventFactory; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator; + +import java.time.Duration; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.isNull; +import static org.mockito.Mockito.lenient; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.mockStatic; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +class IcebergServiceTest { + + @Mock + private EnhancedSourceCoordinator sourceCoordinator; + @Mock + private IcebergSourceConfig sourceConfig; + @Mock + private PluginMetrics pluginMetrics; + @Mock + private AcknowledgementSetManager acknowledgementSetManager; + @Mock + private EventFactory eventFactory; + @Mock + private Buffer> buffer; + + @Test + @SuppressWarnings("unchecked") + void start_uses_shared_catalog_when_table_catalog_is_null() { + final Map sharedCatalog = Map.of("type", "rest", "uri", "http://shared:8181"); + final TableConfig tableConfig = createTableConfig("db.my_table", null); + + when(sourceConfig.getCatalog()).thenReturn(sharedCatalog); + when(sourceConfig.getTables()).thenReturn(List.of(tableConfig)); + when(sourceConfig.getPollingInterval()).thenReturn(Duration.ofSeconds(5)); + lenient().when(sourceConfig.isAcknowledgmentsEnabled()).thenReturn(false); + + final Catalog catalog = mock(Catalog.class); + final Table table = mock(Table.class); + when(table.properties()).thenReturn(Collections.emptyMap()); + when(catalog.loadTable(any(TableIdentifier.class))).thenReturn(table); + + final ArgumentCaptor> propsCaptor = ArgumentCaptor.forClass(Map.class); + + try (MockedStatic mockedCatalogUtil = mockStatic(CatalogUtil.class)) { + mockedCatalogUtil.when(() -> CatalogUtil.buildIcebergCatalog(anyString(), propsCaptor.capture(), isNull())) + .thenReturn(catalog); + + final IcebergService service = new IcebergService(sourceCoordinator, sourceConfig, + pluginMetrics, acknowledgementSetManager, eventFactory); + service.start(buffer); + service.shutdown(); + + assertThat(propsCaptor.getValue(), equalTo(sharedCatalog)); + } + } + + @Test + @SuppressWarnings("unchecked") + void start_uses_table_catalog_when_table_catalog_is_set() { + final Map tableCatalog = Map.of("type", "glue", "warehouse", "s3://other/"); + final TableConfig tableConfig = createTableConfig("db.my_table", tableCatalog); + + when(sourceConfig.getTables()).thenReturn(List.of(tableConfig)); + when(sourceConfig.getPollingInterval()).thenReturn(Duration.ofSeconds(5)); + lenient().when(sourceConfig.isAcknowledgmentsEnabled()).thenReturn(false); + + final Catalog catalog = mock(Catalog.class); + final Table table = mock(Table.class); + when(table.properties()).thenReturn(Collections.emptyMap()); + when(catalog.loadTable(any(TableIdentifier.class))).thenReturn(table); + + final ArgumentCaptor> propsCaptor = ArgumentCaptor.forClass(Map.class); + + try (MockedStatic mockedCatalogUtil = mockStatic(CatalogUtil.class)) { + mockedCatalogUtil.when(() -> CatalogUtil.buildIcebergCatalog(anyString(), propsCaptor.capture(), isNull())) + .thenReturn(catalog); + + final IcebergService service = new IcebergService(sourceCoordinator, sourceConfig, + pluginMetrics, acknowledgementSetManager, eventFactory); + service.start(buffer); + service.shutdown(); + + assertThat(propsCaptor.getValue(), equalTo(tableCatalog)); + } + } + + @Test + @SuppressWarnings("unchecked") + void start_uses_correct_catalog_for_each_table_in_mixed_config() { + final Map sharedCatalog = Map.of("type", "rest", "uri", "http://shared:8181"); + final Map tableBCatalog = Map.of("type", "glue", "warehouse", "s3://other/"); + final TableConfig configA = createTableConfig("db.table_a", null); + final TableConfig configB = createTableConfig("db.table_b", tableBCatalog); + + when(sourceConfig.getCatalog()).thenReturn(sharedCatalog); + when(sourceConfig.getTables()).thenReturn(List.of(configA, configB)); + when(sourceConfig.getPollingInterval()).thenReturn(Duration.ofSeconds(5)); + lenient().when(sourceConfig.isAcknowledgmentsEnabled()).thenReturn(false); + + final Catalog catalog = mock(Catalog.class); + final Table table = mock(Table.class); + when(table.properties()).thenReturn(Collections.emptyMap()); + when(catalog.loadTable(any(TableIdentifier.class))).thenReturn(table); + + final ArgumentCaptor> propsCaptor = ArgumentCaptor.forClass(Map.class); + + try (MockedStatic mockedCatalogUtil = mockStatic(CatalogUtil.class)) { + mockedCatalogUtil.when(() -> CatalogUtil.buildIcebergCatalog(anyString(), propsCaptor.capture(), isNull())) + .thenReturn(catalog); + + final IcebergService service = new IcebergService(sourceCoordinator, sourceConfig, + pluginMetrics, acknowledgementSetManager, eventFactory); + service.start(buffer); + service.shutdown(); + + final List> captured = propsCaptor.getAllValues(); + assertThat(captured.get(0), equalTo(sharedCatalog)); + assertThat(captured.get(1), equalTo(tableBCatalog)); + } + } + + private TableConfig createTableConfig(final String tableName, final Map catalogProps) { + final TableConfig config = mock(TableConfig.class); + when(config.getTableName()).thenReturn(tableName); + when(config.getCatalog()).thenReturn(catalogProps); + when(config.getIdentifierColumns()).thenReturn(Collections.emptyList()); + lenient().when(config.isDisableExport()).thenReturn(false); + return config; + } +}