Skip to content

Commit 8f92bb5

Browse files
Merge upstream/main into opensearch-sink-tsdb-6644
Signed-off-by: Srikanth Padakanti <srikanth_padakanti@apple.com>
2 parents 15c537d + 8ea59d9 commit 8f92bb5

57 files changed

Lines changed: 4630 additions & 260 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

.github/workflows/kafka-plugin-integration-tests.yml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,8 @@ jobs:
6666
-Dtests.kafka.bootstrap_servers=localhost:9092 \
6767
-Dtests.kafka.authconfig.username=admin -Dtests.kafka.authconfig.password=admin \
6868
-Dtests.kafka.kms_key=alias/DataPrepperTesting \
69-
--tests '*kafka.buffer*' --tests KafkaSourceJsonTypeIT --tests KafkaBufferOTelIT
69+
--tests '*kafka.buffer*' --tests KafkaSourceJsonTypeIT --tests KafkaBufferOTelIT \
70+
--tests KafkaSinkJsonTypeIT --tests KafkaSinkPlainTextTypeIT
7071
7172
- name: Upload Unit Test Results
7273
if: always()

.gitignore

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,11 @@ data-prepper-main/src/test/resources/logstash-conf/logstash-filter.yaml
1515

1616
# Eclipse/IDE compiled output
1717
**/bin/
18+
*.project
19+
*.classpath
20+
*.factorypath
21+
**/.settings/**
22+
*.prefs
1823

1924
# Development tools
2025
.DS_Store

data-prepper-plugins/iceberg-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/iceberg/IcebergSourceIT.java

Lines changed: 82 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator;
3434
import org.opensearch.dataprepper.plugins.source.iceberg.coordination.PartitionFactory;
3535
import org.opensearch.dataprepper.plugins.source.iceberg.coordination.partition.LeaderPartition;
36+
import org.opensearch.dataprepper.event.TestEventFactory;
3637
import org.opensearch.dataprepper.plugins.sourcecoordinator.inmemory.InMemorySourceCoordinationStore;
3738

3839
import java.time.Duration;
@@ -140,6 +141,74 @@ void export_writes_all_rows_to_buffer() throws Exception {
140141
}
141142
}
142143

144+
@Test
145+
void export_with_shared_catalog_config() throws Exception {
146+
final Table table = helper.createTable(TEST_NAMESPACE, TEST_TABLE, TEST_SCHEMA);
147+
helper.appendRows(table, List.of(
148+
helper.newRecord(TEST_SCHEMA, 1, "Alice", 30),
149+
helper.newRecord(TEST_SCHEMA, 2, "Bob", 25)
150+
));
151+
152+
final IcebergService service = createService(false, true);
153+
final Buffer<org.opensearch.dataprepper.model.record.Record<Event>> buffer = createMockBuffer();
154+
service.start(buffer);
155+
156+
try {
157+
await().atMost(30, TimeUnit.SECONDS)
158+
.untilAsserted(() -> assertThat(receivedRecords, hasSize(greaterThanOrEqualTo(2))));
159+
assertThat(receivedRecords, hasSize(2));
160+
} finally {
161+
service.shutdown();
162+
}
163+
}
164+
165+
@Test
166+
void export_with_mixed_catalog_config() throws Exception {
167+
// Table A uses shared catalog, Table B overrides with its own catalog (same catalog in test, but exercises the override path)
168+
final String tableNameA = "mixed_a";
169+
final String tableNameB = "mixed_b";
170+
final Table tableA = helper.createTable(TEST_NAMESPACE, tableNameA, TEST_SCHEMA);
171+
final Table tableB = helper.createTable(TEST_NAMESPACE, tableNameB, TEST_SCHEMA);
172+
helper.appendRows(tableA, List.of(helper.newRecord(TEST_SCHEMA, 1, "Alice", 30)));
173+
helper.appendRows(tableB, List.of(helper.newRecord(TEST_SCHEMA, 2, "Bob", 25)));
174+
175+
final IcebergSourceConfig sourceConfig = mock(IcebergSourceConfig.class);
176+
final TableConfig configA = mock(TableConfig.class);
177+
final TableConfig configB = mock(TableConfig.class);
178+
179+
when(configA.getTableName()).thenReturn(TEST_NAMESPACE + "." + tableNameA);
180+
when(configA.getCatalog()).thenReturn(null);
181+
when(configA.getIdentifierColumns()).thenReturn(List.of("id"));
182+
when(configA.isDisableExport()).thenReturn(false);
183+
184+
when(configB.getTableName()).thenReturn(TEST_NAMESPACE + "." + tableNameB);
185+
when(configB.getCatalog()).thenReturn(helper.catalogProperties());
186+
when(configB.getIdentifierColumns()).thenReturn(List.of("id"));
187+
when(configB.isDisableExport()).thenReturn(false);
188+
189+
when(sourceConfig.getCatalog()).thenReturn(helper.catalogProperties());
190+
when(sourceConfig.getTables()).thenReturn(List.of(configA, configB));
191+
when(sourceConfig.getPollingInterval()).thenReturn(Duration.ofSeconds(5));
192+
when(sourceConfig.isAcknowledgmentsEnabled()).thenReturn(false);
193+
194+
final EnhancedSourceCoordinator coordinator = createInMemoryCoordinator();
195+
coordinator.createPartition(new LeaderPartition());
196+
final IcebergService service = new IcebergService(coordinator, sourceConfig, pluginMetrics,
197+
acknowledgementSetManager, TestEventFactory.getTestEventFactory());
198+
final Buffer<org.opensearch.dataprepper.model.record.Record<Event>> buffer = createMockBuffer();
199+
service.start(buffer);
200+
201+
try {
202+
await().atMost(30, TimeUnit.SECONDS)
203+
.untilAsserted(() -> assertThat(receivedRecords, hasSize(greaterThanOrEqualTo(2))));
204+
assertThat(receivedRecords, hasSize(2));
205+
} finally {
206+
service.shutdown();
207+
helper.dropTable(TEST_NAMESPACE, tableNameA);
208+
helper.dropTable(TEST_NAMESPACE, tableNameB);
209+
}
210+
}
211+
143212
@Test
144213
void cdc_insert_produces_insert_events() throws Exception {
145214
final CdcTestFixture fixture = createCdcTestFixture();
@@ -284,17 +353,28 @@ private Buffer<org.opensearch.dataprepper.model.record.Record<Event>> createMock
284353
}
285354

286355
private IcebergService createService(final boolean disableExport) throws Exception {
356+
return createService(disableExport, false);
357+
}
358+
359+
private IcebergService createService(final boolean disableExport, final boolean useSharedCatalog) throws Exception {
287360
final String fullTableName = TEST_NAMESPACE + "." + TEST_TABLE;
288361

289362
// Build config via reflection since fields are private
290363
final IcebergSourceConfig sourceConfig = mock(IcebergSourceConfig.class);
291364
final TableConfig tableConfig = mock(TableConfig.class);
292365

293366
when(tableConfig.getTableName()).thenReturn(fullTableName);
294-
when(tableConfig.getCatalog()).thenReturn(helper.catalogProperties());
295367
when(tableConfig.getIdentifierColumns()).thenReturn(List.of("id"));
296368
when(tableConfig.isDisableExport()).thenReturn(disableExport);
297369

370+
if (useSharedCatalog) {
371+
when(tableConfig.getCatalog()).thenReturn(null);
372+
when(sourceConfig.getCatalog()).thenReturn(helper.catalogProperties());
373+
} else {
374+
when(tableConfig.getCatalog()).thenReturn(helper.catalogProperties());
375+
when(sourceConfig.getCatalog()).thenReturn(null);
376+
}
377+
298378
when(sourceConfig.getTables()).thenReturn(List.of(tableConfig));
299379
when(sourceConfig.getPollingInterval()).thenReturn(Duration.ofSeconds(5));
300380
lenient().when(sourceConfig.isAcknowledgmentsEnabled()).thenReturn(false);
@@ -303,7 +383,7 @@ private IcebergService createService(final boolean disableExport) throws Excepti
303383
coordinator.createPartition(new LeaderPartition());
304384

305385
return new IcebergService(coordinator, sourceConfig, pluginMetrics, acknowledgementSetManager,
306-
org.opensearch.dataprepper.event.TestEventFactory.getTestEventFactory());
386+
TestEventFactory.getTestEventFactory());
307387
}
308388

309389
private EnhancedSourceCoordinator createInMemoryCoordinator() {

data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/IcebergService.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,8 @@ public void start(final Buffer<Record<Event>> buffer) {
6868
final String tableName = tableConfig.getTableName();
6969
LOG.info("Loading catalog and table for {}", tableName);
7070

71-
final Map<String, String> catalogProps = new HashMap<>(tableConfig.getCatalog());
71+
final Map<String, String> catalogProps = new HashMap<>(
72+
tableConfig.getCatalog() != null ? tableConfig.getCatalog() : sourceConfig.getCatalog());
7273
final Catalog catalog = CatalogUtil.buildIcebergCatalog(tableName, catalogProps, null);
7374

7475
final TableIdentifier tableId = TableIdentifier.parse(tableName);

data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/IcebergSourceConfig.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,17 @@
1515
import jakarta.validation.constraints.NotEmpty;
1616

1717
import java.time.Duration;
18+
import java.util.Collections;
1819
import java.util.List;
20+
import java.util.Map;
1921

2022
public class IcebergSourceConfig {
2123

2224
static final Duration DEFAULT_POLLING_INTERVAL = Duration.ofSeconds(30);
2325

26+
@JsonProperty("catalog")
27+
private Map<String, String> catalog = Collections.emptyMap();
28+
2429
@JsonProperty("tables")
2530
@NotEmpty
2631
@Valid
@@ -43,4 +48,8 @@ public Duration getPollingInterval() {
4348
public boolean isAcknowledgmentsEnabled() {
4449
return acknowledgments;
4550
}
51+
52+
public Map<String, String> getCatalog() {
53+
return catalog;
54+
}
4655
}

data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/TableConfig.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ public class TableConfig {
2424
private String tableName;
2525

2626
@JsonProperty("catalog")
27-
private Map<String, String> catalog = Collections.emptyMap();
27+
private Map<String, String> catalog;
2828

2929
@JsonProperty("identifier_columns")
3030
private List<String> identifierColumns = Collections.emptyList();
Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,166 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*
9+
*/
10+
11+
package org.opensearch.dataprepper.plugins.source.iceberg;
12+
13+
import org.apache.iceberg.CatalogUtil;
14+
import org.apache.iceberg.Table;
15+
import org.apache.iceberg.catalog.Catalog;
16+
import org.apache.iceberg.catalog.TableIdentifier;
17+
import org.junit.jupiter.api.Test;
18+
import org.junit.jupiter.api.extension.ExtendWith;
19+
import org.mockito.ArgumentCaptor;
20+
import org.mockito.Mock;
21+
import org.mockito.MockedStatic;
22+
import org.mockito.junit.jupiter.MockitoExtension;
23+
import org.opensearch.dataprepper.metrics.PluginMetrics;
24+
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
25+
import org.opensearch.dataprepper.model.buffer.Buffer;
26+
import org.opensearch.dataprepper.model.event.Event;
27+
import org.opensearch.dataprepper.model.event.EventFactory;
28+
import org.opensearch.dataprepper.model.record.Record;
29+
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator;
30+
31+
import java.time.Duration;
32+
import java.util.Collections;
33+
import java.util.List;
34+
import java.util.Map;
35+
36+
import static org.hamcrest.MatcherAssert.assertThat;
37+
import static org.hamcrest.Matchers.equalTo;
38+
import static org.mockito.ArgumentMatchers.any;
39+
import static org.mockito.ArgumentMatchers.anyString;
40+
import static org.mockito.ArgumentMatchers.isNull;
41+
import static org.mockito.Mockito.lenient;
42+
import static org.mockito.Mockito.mock;
43+
import static org.mockito.Mockito.mockStatic;
44+
import static org.mockito.Mockito.when;
45+
46+
@ExtendWith(MockitoExtension.class)
47+
class IcebergServiceTest {
48+
49+
@Mock
50+
private EnhancedSourceCoordinator sourceCoordinator;
51+
@Mock
52+
private IcebergSourceConfig sourceConfig;
53+
@Mock
54+
private PluginMetrics pluginMetrics;
55+
@Mock
56+
private AcknowledgementSetManager acknowledgementSetManager;
57+
@Mock
58+
private EventFactory eventFactory;
59+
@Mock
60+
private Buffer<Record<Event>> buffer;
61+
62+
@Test
63+
@SuppressWarnings("unchecked")
64+
void start_uses_shared_catalog_when_table_catalog_is_null() {
65+
final Map<String, String> sharedCatalog = Map.of("type", "rest", "uri", "http://shared:8181");
66+
final TableConfig tableConfig = createTableConfig("db.my_table", null);
67+
68+
when(sourceConfig.getCatalog()).thenReturn(sharedCatalog);
69+
when(sourceConfig.getTables()).thenReturn(List.of(tableConfig));
70+
when(sourceConfig.getPollingInterval()).thenReturn(Duration.ofSeconds(5));
71+
lenient().when(sourceConfig.isAcknowledgmentsEnabled()).thenReturn(false);
72+
73+
final Catalog catalog = mock(Catalog.class);
74+
final Table table = mock(Table.class);
75+
when(table.properties()).thenReturn(Collections.emptyMap());
76+
when(catalog.loadTable(any(TableIdentifier.class))).thenReturn(table);
77+
78+
final ArgumentCaptor<Map<String, String>> propsCaptor = ArgumentCaptor.forClass(Map.class);
79+
80+
try (MockedStatic<CatalogUtil> mockedCatalogUtil = mockStatic(CatalogUtil.class)) {
81+
mockedCatalogUtil.when(() -> CatalogUtil.buildIcebergCatalog(anyString(), propsCaptor.capture(), isNull()))
82+
.thenReturn(catalog);
83+
84+
final IcebergService service = new IcebergService(sourceCoordinator, sourceConfig,
85+
pluginMetrics, acknowledgementSetManager, eventFactory);
86+
service.start(buffer);
87+
service.shutdown();
88+
89+
assertThat(propsCaptor.getValue(), equalTo(sharedCatalog));
90+
}
91+
}
92+
93+
@Test
94+
@SuppressWarnings("unchecked")
95+
void start_uses_table_catalog_when_table_catalog_is_set() {
96+
final Map<String, String> tableCatalog = Map.of("type", "glue", "warehouse", "s3://other/");
97+
final TableConfig tableConfig = createTableConfig("db.my_table", tableCatalog);
98+
99+
when(sourceConfig.getTables()).thenReturn(List.of(tableConfig));
100+
when(sourceConfig.getPollingInterval()).thenReturn(Duration.ofSeconds(5));
101+
lenient().when(sourceConfig.isAcknowledgmentsEnabled()).thenReturn(false);
102+
103+
final Catalog catalog = mock(Catalog.class);
104+
final Table table = mock(Table.class);
105+
when(table.properties()).thenReturn(Collections.emptyMap());
106+
when(catalog.loadTable(any(TableIdentifier.class))).thenReturn(table);
107+
108+
final ArgumentCaptor<Map<String, String>> propsCaptor = ArgumentCaptor.forClass(Map.class);
109+
110+
try (MockedStatic<CatalogUtil> mockedCatalogUtil = mockStatic(CatalogUtil.class)) {
111+
mockedCatalogUtil.when(() -> CatalogUtil.buildIcebergCatalog(anyString(), propsCaptor.capture(), isNull()))
112+
.thenReturn(catalog);
113+
114+
final IcebergService service = new IcebergService(sourceCoordinator, sourceConfig,
115+
pluginMetrics, acknowledgementSetManager, eventFactory);
116+
service.start(buffer);
117+
service.shutdown();
118+
119+
assertThat(propsCaptor.getValue(), equalTo(tableCatalog));
120+
}
121+
}
122+
123+
@Test
124+
@SuppressWarnings("unchecked")
125+
void start_uses_correct_catalog_for_each_table_in_mixed_config() {
126+
final Map<String, String> sharedCatalog = Map.of("type", "rest", "uri", "http://shared:8181");
127+
final Map<String, String> tableBCatalog = Map.of("type", "glue", "warehouse", "s3://other/");
128+
final TableConfig configA = createTableConfig("db.table_a", null);
129+
final TableConfig configB = createTableConfig("db.table_b", tableBCatalog);
130+
131+
when(sourceConfig.getCatalog()).thenReturn(sharedCatalog);
132+
when(sourceConfig.getTables()).thenReturn(List.of(configA, configB));
133+
when(sourceConfig.getPollingInterval()).thenReturn(Duration.ofSeconds(5));
134+
lenient().when(sourceConfig.isAcknowledgmentsEnabled()).thenReturn(false);
135+
136+
final Catalog catalog = mock(Catalog.class);
137+
final Table table = mock(Table.class);
138+
when(table.properties()).thenReturn(Collections.emptyMap());
139+
when(catalog.loadTable(any(TableIdentifier.class))).thenReturn(table);
140+
141+
final ArgumentCaptor<Map<String, String>> propsCaptor = ArgumentCaptor.forClass(Map.class);
142+
143+
try (MockedStatic<CatalogUtil> mockedCatalogUtil = mockStatic(CatalogUtil.class)) {
144+
mockedCatalogUtil.when(() -> CatalogUtil.buildIcebergCatalog(anyString(), propsCaptor.capture(), isNull()))
145+
.thenReturn(catalog);
146+
147+
final IcebergService service = new IcebergService(sourceCoordinator, sourceConfig,
148+
pluginMetrics, acknowledgementSetManager, eventFactory);
149+
service.start(buffer);
150+
service.shutdown();
151+
152+
final List<Map<String, String>> captured = propsCaptor.getAllValues();
153+
assertThat(captured.get(0), equalTo(sharedCatalog));
154+
assertThat(captured.get(1), equalTo(tableBCatalog));
155+
}
156+
}
157+
158+
private TableConfig createTableConfig(final String tableName, final Map<String, String> catalogProps) {
159+
final TableConfig config = mock(TableConfig.class);
160+
when(config.getTableName()).thenReturn(tableName);
161+
when(config.getCatalog()).thenReturn(catalogProps);
162+
when(config.getIdentifierColumns()).thenReturn(Collections.emptyList());
163+
lenient().when(config.isDisableExport()).thenReturn(false);
164+
return config;
165+
}
166+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*
9+
*/
10+
11+
package org.opensearch.dataprepper.plugins.source.iceberg;
12+
13+
import com.fasterxml.jackson.databind.ObjectMapper;
14+
import org.junit.jupiter.api.Test;
15+
16+
import java.util.Map;
17+
18+
import static org.hamcrest.MatcherAssert.assertThat;
19+
import static org.hamcrest.Matchers.equalTo;
20+
import static org.hamcrest.Matchers.is;
21+
22+
class IcebergSourceConfigTest {
23+
24+
private static final ObjectMapper MAPPER = new ObjectMapper();
25+
26+
@Test
27+
void default_catalog_is_empty() throws Exception {
28+
final IcebergSourceConfig config = MAPPER.readValue(
29+
"{\"tables\": [{\"table_name\": \"db.t\"}]}", IcebergSourceConfig.class);
30+
assertThat(config.getCatalog().isEmpty(), is(true));
31+
}
32+
33+
@Test
34+
void top_level_catalog_is_deserialized() throws Exception {
35+
final String json = "{\"catalog\": {\"type\": \"rest\", \"uri\": \"http://localhost:8181\"}, " +
36+
"\"tables\": [{\"table_name\": \"db.t\"}]}";
37+
final IcebergSourceConfig config = MAPPER.readValue(json, IcebergSourceConfig.class);
38+
assertThat(config.getCatalog(), equalTo(Map.of("type", "rest", "uri", "http://localhost:8181")));
39+
}
40+
}

0 commit comments

Comments
 (0)