Skip to content

Commit 8ea59d9

Browse files
authored
Support shared catalog config across tables in iceberg-source (opensearch-project#6727)
Support shared catalog config across tables in iceberg-source Change catalog fallback check from isEmpty() to null check so that an explicitly empty catalog is not silently treated as unset. TableConfig.catalog default is now null instead of emptyMap. Add IcebergServiceTest to verify catalog selection logic: shared only, table override, and mixed configurations. Replace fully qualified TestEventFactory references with imports in IcebergSourceIT. Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
1 parent c39d6b0 commit 8ea59d9

6 files changed

Lines changed: 300 additions & 4 deletions

File tree

data-prepper-plugins/iceberg-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/iceberg/IcebergSourceIT.java

Lines changed: 82 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator;
3434
import org.opensearch.dataprepper.plugins.source.iceberg.coordination.PartitionFactory;
3535
import org.opensearch.dataprepper.plugins.source.iceberg.coordination.partition.LeaderPartition;
36+
import org.opensearch.dataprepper.event.TestEventFactory;
3637
import org.opensearch.dataprepper.plugins.sourcecoordinator.inmemory.InMemorySourceCoordinationStore;
3738

3839
import java.time.Duration;
@@ -140,6 +141,74 @@ void export_writes_all_rows_to_buffer() throws Exception {
140141
}
141142
}
142143

144+
@Test
145+
void export_with_shared_catalog_config() throws Exception {
146+
final Table table = helper.createTable(TEST_NAMESPACE, TEST_TABLE, TEST_SCHEMA);
147+
helper.appendRows(table, List.of(
148+
helper.newRecord(TEST_SCHEMA, 1, "Alice", 30),
149+
helper.newRecord(TEST_SCHEMA, 2, "Bob", 25)
150+
));
151+
152+
final IcebergService service = createService(false, true);
153+
final Buffer<org.opensearch.dataprepper.model.record.Record<Event>> buffer = createMockBuffer();
154+
service.start(buffer);
155+
156+
try {
157+
await().atMost(30, TimeUnit.SECONDS)
158+
.untilAsserted(() -> assertThat(receivedRecords, hasSize(greaterThanOrEqualTo(2))));
159+
assertThat(receivedRecords, hasSize(2));
160+
} finally {
161+
service.shutdown();
162+
}
163+
}
164+
165+
@Test
166+
void export_with_mixed_catalog_config() throws Exception {
167+
// Table A uses shared catalog, Table B overrides with its own catalog (same catalog in test, but exercises the override path)
168+
final String tableNameA = "mixed_a";
169+
final String tableNameB = "mixed_b";
170+
final Table tableA = helper.createTable(TEST_NAMESPACE, tableNameA, TEST_SCHEMA);
171+
final Table tableB = helper.createTable(TEST_NAMESPACE, tableNameB, TEST_SCHEMA);
172+
helper.appendRows(tableA, List.of(helper.newRecord(TEST_SCHEMA, 1, "Alice", 30)));
173+
helper.appendRows(tableB, List.of(helper.newRecord(TEST_SCHEMA, 2, "Bob", 25)));
174+
175+
final IcebergSourceConfig sourceConfig = mock(IcebergSourceConfig.class);
176+
final TableConfig configA = mock(TableConfig.class);
177+
final TableConfig configB = mock(TableConfig.class);
178+
179+
when(configA.getTableName()).thenReturn(TEST_NAMESPACE + "." + tableNameA);
180+
when(configA.getCatalog()).thenReturn(null);
181+
when(configA.getIdentifierColumns()).thenReturn(List.of("id"));
182+
when(configA.isDisableExport()).thenReturn(false);
183+
184+
when(configB.getTableName()).thenReturn(TEST_NAMESPACE + "." + tableNameB);
185+
when(configB.getCatalog()).thenReturn(helper.catalogProperties());
186+
when(configB.getIdentifierColumns()).thenReturn(List.of("id"));
187+
when(configB.isDisableExport()).thenReturn(false);
188+
189+
when(sourceConfig.getCatalog()).thenReturn(helper.catalogProperties());
190+
when(sourceConfig.getTables()).thenReturn(List.of(configA, configB));
191+
when(sourceConfig.getPollingInterval()).thenReturn(Duration.ofSeconds(5));
192+
when(sourceConfig.isAcknowledgmentsEnabled()).thenReturn(false);
193+
194+
final EnhancedSourceCoordinator coordinator = createInMemoryCoordinator();
195+
coordinator.createPartition(new LeaderPartition());
196+
final IcebergService service = new IcebergService(coordinator, sourceConfig, pluginMetrics,
197+
acknowledgementSetManager, TestEventFactory.getTestEventFactory());
198+
final Buffer<org.opensearch.dataprepper.model.record.Record<Event>> buffer = createMockBuffer();
199+
service.start(buffer);
200+
201+
try {
202+
await().atMost(30, TimeUnit.SECONDS)
203+
.untilAsserted(() -> assertThat(receivedRecords, hasSize(greaterThanOrEqualTo(2))));
204+
assertThat(receivedRecords, hasSize(2));
205+
} finally {
206+
service.shutdown();
207+
helper.dropTable(TEST_NAMESPACE, tableNameA);
208+
helper.dropTable(TEST_NAMESPACE, tableNameB);
209+
}
210+
}
211+
143212
@Test
144213
void cdc_insert_produces_insert_events() throws Exception {
145214
final CdcTestFixture fixture = createCdcTestFixture();
@@ -284,17 +353,28 @@ private Buffer<org.opensearch.dataprepper.model.record.Record<Event>> createMock
284353
}
285354

286355
private IcebergService createService(final boolean disableExport) throws Exception {
356+
return createService(disableExport, false);
357+
}
358+
359+
private IcebergService createService(final boolean disableExport, final boolean useSharedCatalog) throws Exception {
287360
final String fullTableName = TEST_NAMESPACE + "." + TEST_TABLE;
288361

289362
// Build config via reflection since fields are private
290363
final IcebergSourceConfig sourceConfig = mock(IcebergSourceConfig.class);
291364
final TableConfig tableConfig = mock(TableConfig.class);
292365

293366
when(tableConfig.getTableName()).thenReturn(fullTableName);
294-
when(tableConfig.getCatalog()).thenReturn(helper.catalogProperties());
295367
when(tableConfig.getIdentifierColumns()).thenReturn(List.of("id"));
296368
when(tableConfig.isDisableExport()).thenReturn(disableExport);
297369

370+
if (useSharedCatalog) {
371+
when(tableConfig.getCatalog()).thenReturn(null);
372+
when(sourceConfig.getCatalog()).thenReturn(helper.catalogProperties());
373+
} else {
374+
when(tableConfig.getCatalog()).thenReturn(helper.catalogProperties());
375+
when(sourceConfig.getCatalog()).thenReturn(null);
376+
}
377+
298378
when(sourceConfig.getTables()).thenReturn(List.of(tableConfig));
299379
when(sourceConfig.getPollingInterval()).thenReturn(Duration.ofSeconds(5));
300380
lenient().when(sourceConfig.isAcknowledgmentsEnabled()).thenReturn(false);
@@ -303,7 +383,7 @@ private IcebergService createService(final boolean disableExport) throws Excepti
303383
coordinator.createPartition(new LeaderPartition());
304384

305385
return new IcebergService(coordinator, sourceConfig, pluginMetrics, acknowledgementSetManager,
306-
org.opensearch.dataprepper.event.TestEventFactory.getTestEventFactory());
386+
TestEventFactory.getTestEventFactory());
307387
}
308388

309389
private EnhancedSourceCoordinator createInMemoryCoordinator() {

data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/IcebergService.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,8 @@ public void start(final Buffer<Record<Event>> buffer) {
6868
final String tableName = tableConfig.getTableName();
6969
LOG.info("Loading catalog and table for {}", tableName);
7070

71-
final Map<String, String> catalogProps = new HashMap<>(tableConfig.getCatalog());
71+
final Map<String, String> catalogProps = new HashMap<>(
72+
tableConfig.getCatalog() != null ? tableConfig.getCatalog() : sourceConfig.getCatalog());
7273
final Catalog catalog = CatalogUtil.buildIcebergCatalog(tableName, catalogProps, null);
7374

7475
final TableIdentifier tableId = TableIdentifier.parse(tableName);

data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/IcebergSourceConfig.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,17 @@
1515
import jakarta.validation.constraints.NotEmpty;
1616

1717
import java.time.Duration;
18+
import java.util.Collections;
1819
import java.util.List;
20+
import java.util.Map;
1921

2022
public class IcebergSourceConfig {
2123

2224
static final Duration DEFAULT_POLLING_INTERVAL = Duration.ofSeconds(30);
2325

26+
@JsonProperty("catalog")
27+
private Map<String, String> catalog = Collections.emptyMap();
28+
2429
@JsonProperty("tables")
2530
@NotEmpty
2631
@Valid
@@ -43,4 +48,8 @@ public Duration getPollingInterval() {
4348
public boolean isAcknowledgmentsEnabled() {
4449
return acknowledgments;
4550
}
51+
52+
public Map<String, String> getCatalog() {
53+
return catalog;
54+
}
4655
}

data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/TableConfig.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ public class TableConfig {
2424
private String tableName;
2525

2626
@JsonProperty("catalog")
27-
private Map<String, String> catalog = Collections.emptyMap();
27+
private Map<String, String> catalog;
2828

2929
@JsonProperty("identifier_columns")
3030
private List<String> identifierColumns = Collections.emptyList();
Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,166 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*
9+
*/
10+
11+
package org.opensearch.dataprepper.plugins.source.iceberg;
12+
13+
import org.apache.iceberg.CatalogUtil;
14+
import org.apache.iceberg.Table;
15+
import org.apache.iceberg.catalog.Catalog;
16+
import org.apache.iceberg.catalog.TableIdentifier;
17+
import org.junit.jupiter.api.Test;
18+
import org.junit.jupiter.api.extension.ExtendWith;
19+
import org.mockito.ArgumentCaptor;
20+
import org.mockito.Mock;
21+
import org.mockito.MockedStatic;
22+
import org.mockito.junit.jupiter.MockitoExtension;
23+
import org.opensearch.dataprepper.metrics.PluginMetrics;
24+
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
25+
import org.opensearch.dataprepper.model.buffer.Buffer;
26+
import org.opensearch.dataprepper.model.event.Event;
27+
import org.opensearch.dataprepper.model.event.EventFactory;
28+
import org.opensearch.dataprepper.model.record.Record;
29+
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator;
30+
31+
import java.time.Duration;
32+
import java.util.Collections;
33+
import java.util.List;
34+
import java.util.Map;
35+
36+
import static org.hamcrest.MatcherAssert.assertThat;
37+
import static org.hamcrest.Matchers.equalTo;
38+
import static org.mockito.ArgumentMatchers.any;
39+
import static org.mockito.ArgumentMatchers.anyString;
40+
import static org.mockito.ArgumentMatchers.isNull;
41+
import static org.mockito.Mockito.lenient;
42+
import static org.mockito.Mockito.mock;
43+
import static org.mockito.Mockito.mockStatic;
44+
import static org.mockito.Mockito.when;
45+
46+
@ExtendWith(MockitoExtension.class)
47+
class IcebergServiceTest {
48+
49+
@Mock
50+
private EnhancedSourceCoordinator sourceCoordinator;
51+
@Mock
52+
private IcebergSourceConfig sourceConfig;
53+
@Mock
54+
private PluginMetrics pluginMetrics;
55+
@Mock
56+
private AcknowledgementSetManager acknowledgementSetManager;
57+
@Mock
58+
private EventFactory eventFactory;
59+
@Mock
60+
private Buffer<Record<Event>> buffer;
61+
62+
@Test
63+
@SuppressWarnings("unchecked")
64+
void start_uses_shared_catalog_when_table_catalog_is_null() {
65+
final Map<String, String> sharedCatalog = Map.of("type", "rest", "uri", "http://shared:8181");
66+
final TableConfig tableConfig = createTableConfig("db.my_table", null);
67+
68+
when(sourceConfig.getCatalog()).thenReturn(sharedCatalog);
69+
when(sourceConfig.getTables()).thenReturn(List.of(tableConfig));
70+
when(sourceConfig.getPollingInterval()).thenReturn(Duration.ofSeconds(5));
71+
lenient().when(sourceConfig.isAcknowledgmentsEnabled()).thenReturn(false);
72+
73+
final Catalog catalog = mock(Catalog.class);
74+
final Table table = mock(Table.class);
75+
when(table.properties()).thenReturn(Collections.emptyMap());
76+
when(catalog.loadTable(any(TableIdentifier.class))).thenReturn(table);
77+
78+
final ArgumentCaptor<Map<String, String>> propsCaptor = ArgumentCaptor.forClass(Map.class);
79+
80+
try (MockedStatic<CatalogUtil> mockedCatalogUtil = mockStatic(CatalogUtil.class)) {
81+
mockedCatalogUtil.when(() -> CatalogUtil.buildIcebergCatalog(anyString(), propsCaptor.capture(), isNull()))
82+
.thenReturn(catalog);
83+
84+
final IcebergService service = new IcebergService(sourceCoordinator, sourceConfig,
85+
pluginMetrics, acknowledgementSetManager, eventFactory);
86+
service.start(buffer);
87+
service.shutdown();
88+
89+
assertThat(propsCaptor.getValue(), equalTo(sharedCatalog));
90+
}
91+
}
92+
93+
@Test
94+
@SuppressWarnings("unchecked")
95+
void start_uses_table_catalog_when_table_catalog_is_set() {
96+
final Map<String, String> tableCatalog = Map.of("type", "glue", "warehouse", "s3://other/");
97+
final TableConfig tableConfig = createTableConfig("db.my_table", tableCatalog);
98+
99+
when(sourceConfig.getTables()).thenReturn(List.of(tableConfig));
100+
when(sourceConfig.getPollingInterval()).thenReturn(Duration.ofSeconds(5));
101+
lenient().when(sourceConfig.isAcknowledgmentsEnabled()).thenReturn(false);
102+
103+
final Catalog catalog = mock(Catalog.class);
104+
final Table table = mock(Table.class);
105+
when(table.properties()).thenReturn(Collections.emptyMap());
106+
when(catalog.loadTable(any(TableIdentifier.class))).thenReturn(table);
107+
108+
final ArgumentCaptor<Map<String, String>> propsCaptor = ArgumentCaptor.forClass(Map.class);
109+
110+
try (MockedStatic<CatalogUtil> mockedCatalogUtil = mockStatic(CatalogUtil.class)) {
111+
mockedCatalogUtil.when(() -> CatalogUtil.buildIcebergCatalog(anyString(), propsCaptor.capture(), isNull()))
112+
.thenReturn(catalog);
113+
114+
final IcebergService service = new IcebergService(sourceCoordinator, sourceConfig,
115+
pluginMetrics, acknowledgementSetManager, eventFactory);
116+
service.start(buffer);
117+
service.shutdown();
118+
119+
assertThat(propsCaptor.getValue(), equalTo(tableCatalog));
120+
}
121+
}
122+
123+
@Test
124+
@SuppressWarnings("unchecked")
125+
void start_uses_correct_catalog_for_each_table_in_mixed_config() {
126+
final Map<String, String> sharedCatalog = Map.of("type", "rest", "uri", "http://shared:8181");
127+
final Map<String, String> tableBCatalog = Map.of("type", "glue", "warehouse", "s3://other/");
128+
final TableConfig configA = createTableConfig("db.table_a", null);
129+
final TableConfig configB = createTableConfig("db.table_b", tableBCatalog);
130+
131+
when(sourceConfig.getCatalog()).thenReturn(sharedCatalog);
132+
when(sourceConfig.getTables()).thenReturn(List.of(configA, configB));
133+
when(sourceConfig.getPollingInterval()).thenReturn(Duration.ofSeconds(5));
134+
lenient().when(sourceConfig.isAcknowledgmentsEnabled()).thenReturn(false);
135+
136+
final Catalog catalog = mock(Catalog.class);
137+
final Table table = mock(Table.class);
138+
when(table.properties()).thenReturn(Collections.emptyMap());
139+
when(catalog.loadTable(any(TableIdentifier.class))).thenReturn(table);
140+
141+
final ArgumentCaptor<Map<String, String>> propsCaptor = ArgumentCaptor.forClass(Map.class);
142+
143+
try (MockedStatic<CatalogUtil> mockedCatalogUtil = mockStatic(CatalogUtil.class)) {
144+
mockedCatalogUtil.when(() -> CatalogUtil.buildIcebergCatalog(anyString(), propsCaptor.capture(), isNull()))
145+
.thenReturn(catalog);
146+
147+
final IcebergService service = new IcebergService(sourceCoordinator, sourceConfig,
148+
pluginMetrics, acknowledgementSetManager, eventFactory);
149+
service.start(buffer);
150+
service.shutdown();
151+
152+
final List<Map<String, String>> captured = propsCaptor.getAllValues();
153+
assertThat(captured.get(0), equalTo(sharedCatalog));
154+
assertThat(captured.get(1), equalTo(tableBCatalog));
155+
}
156+
}
157+
158+
private TableConfig createTableConfig(final String tableName, final Map<String, String> catalogProps) {
159+
final TableConfig config = mock(TableConfig.class);
160+
when(config.getTableName()).thenReturn(tableName);
161+
when(config.getCatalog()).thenReturn(catalogProps);
162+
when(config.getIdentifierColumns()).thenReturn(Collections.emptyList());
163+
lenient().when(config.isDisableExport()).thenReturn(false);
164+
return config;
165+
}
166+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*
9+
*/
10+
11+
package org.opensearch.dataprepper.plugins.source.iceberg;
12+
13+
import com.fasterxml.jackson.databind.ObjectMapper;
14+
import org.junit.jupiter.api.Test;
15+
16+
import java.util.Map;
17+
18+
import static org.hamcrest.MatcherAssert.assertThat;
19+
import static org.hamcrest.Matchers.equalTo;
20+
import static org.hamcrest.Matchers.is;
21+
22+
class IcebergSourceConfigTest {
23+
24+
private static final ObjectMapper MAPPER = new ObjectMapper();
25+
26+
@Test
27+
void default_catalog_is_empty() throws Exception {
28+
final IcebergSourceConfig config = MAPPER.readValue(
29+
"{\"tables\": [{\"table_name\": \"db.t\"}]}", IcebergSourceConfig.class);
30+
assertThat(config.getCatalog().isEmpty(), is(true));
31+
}
32+
33+
@Test
34+
void top_level_catalog_is_deserialized() throws Exception {
35+
final String json = "{\"catalog\": {\"type\": \"rest\", \"uri\": \"http://localhost:8181\"}, " +
36+
"\"tables\": [{\"table_name\": \"db.t\"}]}";
37+
final IcebergSourceConfig config = MAPPER.readValue(json, IcebergSourceConfig.class);
38+
assertThat(config.getCatalog(), equalTo(Map.of("type", "rest", "uri", "http://localhost:8181")));
39+
}
40+
}

0 commit comments

Comments
 (0)