Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator;
import org.opensearch.dataprepper.plugins.source.iceberg.coordination.PartitionFactory;
import org.opensearch.dataprepper.plugins.source.iceberg.coordination.partition.LeaderPartition;
import org.opensearch.dataprepper.event.TestEventFactory;
import org.opensearch.dataprepper.plugins.sourcecoordinator.inmemory.InMemorySourceCoordinationStore;

import java.time.Duration;
Expand Down Expand Up @@ -140,6 +141,74 @@ void export_writes_all_rows_to_buffer() throws Exception {
}
}

@Test
void export_with_shared_catalog_config() throws Exception {
final Table table = helper.createTable(TEST_NAMESPACE, TEST_TABLE, TEST_SCHEMA);
helper.appendRows(table, List.of(
helper.newRecord(TEST_SCHEMA, 1, "Alice", 30),
helper.newRecord(TEST_SCHEMA, 2, "Bob", 25)
));

final IcebergService service = createService(false, true);
final Buffer<org.opensearch.dataprepper.model.record.Record<Event>> buffer = createMockBuffer();
service.start(buffer);

try {
await().atMost(30, TimeUnit.SECONDS)
.untilAsserted(() -> assertThat(receivedRecords, hasSize(greaterThanOrEqualTo(2))));
assertThat(receivedRecords, hasSize(2));
} finally {
service.shutdown();
}
}

@Test
void export_with_mixed_catalog_config() throws Exception {
// Table A uses shared catalog, Table B overrides with its own catalog (same catalog in test, but exercises the override path)
final String tableNameA = "mixed_a";
final String tableNameB = "mixed_b";
final Table tableA = helper.createTable(TEST_NAMESPACE, tableNameA, TEST_SCHEMA);
final Table tableB = helper.createTable(TEST_NAMESPACE, tableNameB, TEST_SCHEMA);
helper.appendRows(tableA, List.of(helper.newRecord(TEST_SCHEMA, 1, "Alice", 30)));
helper.appendRows(tableB, List.of(helper.newRecord(TEST_SCHEMA, 2, "Bob", 25)));

final IcebergSourceConfig sourceConfig = mock(IcebergSourceConfig.class);
final TableConfig configA = mock(TableConfig.class);
final TableConfig configB = mock(TableConfig.class);

when(configA.getTableName()).thenReturn(TEST_NAMESPACE + "." + tableNameA);
when(configA.getCatalog()).thenReturn(null);
when(configA.getIdentifierColumns()).thenReturn(List.of("id"));
when(configA.isDisableExport()).thenReturn(false);

when(configB.getTableName()).thenReturn(TEST_NAMESPACE + "." + tableNameB);
when(configB.getCatalog()).thenReturn(helper.catalogProperties());
when(configB.getIdentifierColumns()).thenReturn(List.of("id"));
when(configB.isDisableExport()).thenReturn(false);

when(sourceConfig.getCatalog()).thenReturn(helper.catalogProperties());

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we use an alternative catalog to be sure that it is correctly selected?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the integration test environment has a single REST catalog server, using a truly distinct catalog to verify selection is not straightforward. I added IcebergServiceTest with unit tests that use mockStatic(CatalogUtil.class) to capture and assert the exact catalog properties passed to buildIcebergCatalog for each table. The integration test export_with_mixed_catalog_config is kept as a smoke test to confirm the mixed configuration works end to end.

when(sourceConfig.getTables()).thenReturn(List.of(configA, configB));
when(sourceConfig.getPollingInterval()).thenReturn(Duration.ofSeconds(5));
when(sourceConfig.isAcknowledgmentsEnabled()).thenReturn(false);

final EnhancedSourceCoordinator coordinator = createInMemoryCoordinator();
coordinator.createPartition(new LeaderPartition());
final IcebergService service = new IcebergService(coordinator, sourceConfig, pluginMetrics,
acknowledgementSetManager, TestEventFactory.getTestEventFactory());
final Buffer<org.opensearch.dataprepper.model.record.Record<Event>> buffer = createMockBuffer();
service.start(buffer);

try {
await().atMost(30, TimeUnit.SECONDS)
.untilAsserted(() -> assertThat(receivedRecords, hasSize(greaterThanOrEqualTo(2))));
assertThat(receivedRecords, hasSize(2));
} finally {
service.shutdown();
helper.dropTable(TEST_NAMESPACE, tableNameA);
helper.dropTable(TEST_NAMESPACE, tableNameB);
}
}

@Test
void cdc_insert_produces_insert_events() throws Exception {
final CdcTestFixture fixture = createCdcTestFixture();
Expand Down Expand Up @@ -284,17 +353,28 @@ private Buffer<org.opensearch.dataprepper.model.record.Record<Event>> createMock
}

private IcebergService createService(final boolean disableExport) throws Exception {
return createService(disableExport, false);
}

private IcebergService createService(final boolean disableExport, final boolean useSharedCatalog) throws Exception {
final String fullTableName = TEST_NAMESPACE + "." + TEST_TABLE;

// Build config via reflection since fields are private
final IcebergSourceConfig sourceConfig = mock(IcebergSourceConfig.class);
final TableConfig tableConfig = mock(TableConfig.class);

when(tableConfig.getTableName()).thenReturn(fullTableName);
when(tableConfig.getCatalog()).thenReturn(helper.catalogProperties());
when(tableConfig.getIdentifierColumns()).thenReturn(List.of("id"));
when(tableConfig.isDisableExport()).thenReturn(disableExport);

if (useSharedCatalog) {
when(tableConfig.getCatalog()).thenReturn(null);
when(sourceConfig.getCatalog()).thenReturn(helper.catalogProperties());
} else {
when(tableConfig.getCatalog()).thenReturn(helper.catalogProperties());
when(sourceConfig.getCatalog()).thenReturn(null);
}

when(sourceConfig.getTables()).thenReturn(List.of(tableConfig));
when(sourceConfig.getPollingInterval()).thenReturn(Duration.ofSeconds(5));
lenient().when(sourceConfig.isAcknowledgmentsEnabled()).thenReturn(false);
Expand All @@ -303,7 +383,7 @@ private IcebergService createService(final boolean disableExport) throws Excepti
coordinator.createPartition(new LeaderPartition());

return new IcebergService(coordinator, sourceConfig, pluginMetrics, acknowledgementSetManager,
org.opensearch.dataprepper.event.TestEventFactory.getTestEventFactory());
TestEventFactory.getTestEventFactory());
}

private EnhancedSourceCoordinator createInMemoryCoordinator() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ public void start(final Buffer<Record<Event>> buffer) {
final String tableName = tableConfig.getTableName();
LOG.info("Loading catalog and table for {}", tableName);

final Map<String, String> catalogProps = new HashMap<>(tableConfig.getCatalog());
final Map<String, String> catalogProps = new HashMap<>(
tableConfig.getCatalog() != null ? tableConfig.getCatalog() : sourceConfig.getCatalog());
final Catalog catalog = CatalogUtil.buildIcebergCatalog(tableName, catalogProps, null);

final TableIdentifier tableId = TableIdentifier.parse(tableName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,17 @@
import jakarta.validation.constraints.NotEmpty;

import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Map;

public class IcebergSourceConfig {

static final Duration DEFAULT_POLLING_INTERVAL = Duration.ofSeconds(30);

@JsonProperty("catalog")
private Map<String, String> catalog = Collections.emptyMap();

@JsonProperty("tables")
@NotEmpty
@Valid
Expand All @@ -43,4 +48,8 @@ public Duration getPollingInterval() {
public boolean isAcknowledgmentsEnabled() {
return acknowledgments;
}

public Map<String, String> getCatalog() {
return catalog;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public class TableConfig {
private String tableName;

@JsonProperty("catalog")
private Map<String, String> catalog = Collections.emptyMap();
private Map<String, String> catalog;

@JsonProperty("identifier_columns")
private List<String> identifierColumns = Collections.emptyList();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*/

package org.opensearch.dataprepper.plugins.source.iceberg;

import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.MockedStatic;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventFactory;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator;

import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Map;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.isNull;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.when;

@ExtendWith(MockitoExtension.class)
class IcebergServiceTest {

@Mock
private EnhancedSourceCoordinator sourceCoordinator;
@Mock
private IcebergSourceConfig sourceConfig;
@Mock
private PluginMetrics pluginMetrics;
@Mock
private AcknowledgementSetManager acknowledgementSetManager;
@Mock
private EventFactory eventFactory;
@Mock
private Buffer<Record<Event>> buffer;

@Test
@SuppressWarnings("unchecked")
void start_uses_shared_catalog_when_table_catalog_is_null() {
final Map<String, String> sharedCatalog = Map.of("type", "rest", "uri", "http://shared:8181");
final TableConfig tableConfig = createTableConfig("db.my_table", null);

when(sourceConfig.getCatalog()).thenReturn(sharedCatalog);
when(sourceConfig.getTables()).thenReturn(List.of(tableConfig));
when(sourceConfig.getPollingInterval()).thenReturn(Duration.ofSeconds(5));
lenient().when(sourceConfig.isAcknowledgmentsEnabled()).thenReturn(false);

final Catalog catalog = mock(Catalog.class);
final Table table = mock(Table.class);
when(table.properties()).thenReturn(Collections.emptyMap());
when(catalog.loadTable(any(TableIdentifier.class))).thenReturn(table);

final ArgumentCaptor<Map<String, String>> propsCaptor = ArgumentCaptor.forClass(Map.class);

try (MockedStatic<CatalogUtil> mockedCatalogUtil = mockStatic(CatalogUtil.class)) {
mockedCatalogUtil.when(() -> CatalogUtil.buildIcebergCatalog(anyString(), propsCaptor.capture(), isNull()))
.thenReturn(catalog);

final IcebergService service = new IcebergService(sourceCoordinator, sourceConfig,
pluginMetrics, acknowledgementSetManager, eventFactory);
service.start(buffer);
service.shutdown();

assertThat(propsCaptor.getValue(), equalTo(sharedCatalog));
}
}

@Test
@SuppressWarnings("unchecked")
void start_uses_table_catalog_when_table_catalog_is_set() {
final Map<String, String> tableCatalog = Map.of("type", "glue", "warehouse", "s3://other/");
final TableConfig tableConfig = createTableConfig("db.my_table", tableCatalog);

when(sourceConfig.getTables()).thenReturn(List.of(tableConfig));
when(sourceConfig.getPollingInterval()).thenReturn(Duration.ofSeconds(5));
lenient().when(sourceConfig.isAcknowledgmentsEnabled()).thenReturn(false);

final Catalog catalog = mock(Catalog.class);
final Table table = mock(Table.class);
when(table.properties()).thenReturn(Collections.emptyMap());
when(catalog.loadTable(any(TableIdentifier.class))).thenReturn(table);

final ArgumentCaptor<Map<String, String>> propsCaptor = ArgumentCaptor.forClass(Map.class);

try (MockedStatic<CatalogUtil> mockedCatalogUtil = mockStatic(CatalogUtil.class)) {
mockedCatalogUtil.when(() -> CatalogUtil.buildIcebergCatalog(anyString(), propsCaptor.capture(), isNull()))
.thenReturn(catalog);

final IcebergService service = new IcebergService(sourceCoordinator, sourceConfig,
pluginMetrics, acknowledgementSetManager, eventFactory);
service.start(buffer);
service.shutdown();

assertThat(propsCaptor.getValue(), equalTo(tableCatalog));
}
}

@Test
@SuppressWarnings("unchecked")
void start_uses_correct_catalog_for_each_table_in_mixed_config() {
final Map<String, String> sharedCatalog = Map.of("type", "rest", "uri", "http://shared:8181");
final Map<String, String> tableBCatalog = Map.of("type", "glue", "warehouse", "s3://other/");
final TableConfig configA = createTableConfig("db.table_a", null);
final TableConfig configB = createTableConfig("db.table_b", tableBCatalog);

when(sourceConfig.getCatalog()).thenReturn(sharedCatalog);
when(sourceConfig.getTables()).thenReturn(List.of(configA, configB));
when(sourceConfig.getPollingInterval()).thenReturn(Duration.ofSeconds(5));
lenient().when(sourceConfig.isAcknowledgmentsEnabled()).thenReturn(false);

final Catalog catalog = mock(Catalog.class);
final Table table = mock(Table.class);
when(table.properties()).thenReturn(Collections.emptyMap());
when(catalog.loadTable(any(TableIdentifier.class))).thenReturn(table);

final ArgumentCaptor<Map<String, String>> propsCaptor = ArgumentCaptor.forClass(Map.class);

try (MockedStatic<CatalogUtil> mockedCatalogUtil = mockStatic(CatalogUtil.class)) {
mockedCatalogUtil.when(() -> CatalogUtil.buildIcebergCatalog(anyString(), propsCaptor.capture(), isNull()))
.thenReturn(catalog);

final IcebergService service = new IcebergService(sourceCoordinator, sourceConfig,
pluginMetrics, acknowledgementSetManager, eventFactory);
service.start(buffer);
service.shutdown();

final List<Map<String, String>> captured = propsCaptor.getAllValues();
assertThat(captured.get(0), equalTo(sharedCatalog));
assertThat(captured.get(1), equalTo(tableBCatalog));
}
}

private TableConfig createTableConfig(final String tableName, final Map<String, String> catalogProps) {
final TableConfig config = mock(TableConfig.class);
when(config.getTableName()).thenReturn(tableName);
when(config.getCatalog()).thenReturn(catalogProps);
when(config.getIdentifierColumns()).thenReturn(Collections.emptyList());
lenient().when(config.isDisableExport()).thenReturn(false);
return config;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*/

package org.opensearch.dataprepper.plugins.source.iceberg;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.junit.jupiter.api.Test;

import java.util.Map;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;

class IcebergSourceConfigTest {

private static final ObjectMapper MAPPER = new ObjectMapper();

@Test
void default_catalog_is_empty() throws Exception {
final IcebergSourceConfig config = MAPPER.readValue(
"{\"tables\": [{\"table_name\": \"db.t\"}]}", IcebergSourceConfig.class);
assertThat(config.getCatalog().isEmpty(), is(true));
}

@Test
void top_level_catalog_is_deserialized() throws Exception {
final String json = "{\"catalog\": {\"type\": \"rest\", \"uri\": \"http://localhost:8181\"}, " +
"\"tables\": [{\"table_name\": \"db.t\"}]}";
final IcebergSourceConfig config = MAPPER.readValue(json, IcebergSourceConfig.class);
assertThat(config.getCatalog(), equalTo(Map.of("type", "rest", "uri", "http://localhost:8181")));
}
}
Loading