diff --git a/data-prepper-plugins/jdbc-source-coordination-store/build.gradle b/data-prepper-plugins/jdbc-source-coordination-store/build.gradle new file mode 100644 index 0000000000..39e16c6f77 --- /dev/null +++ b/data-prepper-plugins/jdbc-source-coordination-store/build.gradle @@ -0,0 +1,59 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +dependencies { + implementation project(':data-prepper-api') + implementation 'com.zaxxer:HikariCP:5.1.0' + implementation 'com.fasterxml.jackson.core:jackson-databind' + implementation 'javax.inject:javax.inject:1' + runtimeOnly 'org.postgresql:postgresql:42.7.7' + runtimeOnly 'com.mysql:mysql-connector-j:8.4.0' + testImplementation 'org.awaitility:awaitility:4.3.0' +} + +jacocoTestCoverageVerification { + dependsOn jacocoTestReport + violationRules { + rule { + limit { + minimum = 0.99 + } + } + } +} + +sourceSets { + integrationTest { + java { + compileClasspath += main.output + test.output + runtimeClasspath += main.output + test.output + srcDir file('src/integrationTest/java') + } + } +} + +configurations { + integrationTestImplementation.extendsFrom testImplementation + integrationTestRuntime.extendsFrom testRuntime +} + +task integrationTest(type: Test) { + group = 'verification' + testClassesDirs = sourceSets.integrationTest.output.classesDirs + useJUnitPlatform() + classpath = sourceSets.integrationTest.runtimeClasspath + + systemProperty 'tests.jdbc.url', System.getProperty('tests.jdbc.url', '') + systemProperty 'tests.jdbc.username', System.getProperty('tests.jdbc.username', '') + systemProperty 'tests.jdbc.password', System.getProperty('tests.jdbc.password', '') + + filter { + includeTestsMatching '*IT' + } +} diff --git a/data-prepper-plugins/jdbc-source-coordination-store/docker/docker-compose.yml b/data-prepper-plugins/jdbc-source-coordination-store/docker/docker-compose.yml new file mode 100644 index 0000000000..3690027c84 --- /dev/null +++ b/data-prepper-plugins/jdbc-source-coordination-store/docker/docker-compose.yml @@ -0,0 +1,26 @@ +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. + +services: + postgres: + image: postgres:16 + environment: + POSTGRES_DB: dataprepper + POSTGRES_USER: dp_user + POSTGRES_PASSWORD: dp_pass + ports: + - "5433:5432" + + mysql: + image: mysql:8.4 + environment: + MYSQL_DATABASE: dataprepper + MYSQL_USER: dp_user + MYSQL_PASSWORD: dp_pass + MYSQL_ROOT_PASSWORD: root_pass + ports: + - "3307:3306" diff --git a/data-prepper-plugins/jdbc-source-coordination-store/src/integrationTest/java/org/opensearch/dataprepper/plugins/sourcecoordinator/jdbc/JdbcSourceCoordinationStoreIT.java b/data-prepper-plugins/jdbc-source-coordination-store/src/integrationTest/java/org/opensearch/dataprepper/plugins/sourcecoordinator/jdbc/JdbcSourceCoordinationStoreIT.java new file mode 100644 index 0000000000..0936d295a0 --- /dev/null +++ b/data-prepper-plugins/jdbc-source-coordination-store/src/integrationTest/java/org/opensearch/dataprepper/plugins/sourcecoordinator/jdbc/JdbcSourceCoordinationStoreIT.java @@ -0,0 +1,236 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.dataprepper.plugins.sourcecoordinator.jdbc; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.opensearch.dataprepper.model.source.coordinator.SourcePartitionStatus; +import org.opensearch.dataprepper.model.source.coordinator.SourcePartitionStoreItem; +import org.opensearch.dataprepper.model.source.coordinator.exceptions.PartitionUpdateException; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.Statement; +import java.time.Duration; +import java.time.Instant; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.TimeUnit; + +import static org.awaitility.Awaitility.await; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; +import static org.junit.jupiter.api.Assertions.assertThrows; + +class JdbcSourceCoordinationStoreIT { + + private static final String JDBC_URL = getProperty("tests.jdbc.url", "jdbc:postgresql://localhost:5433/dataprepper"); + private static final String JDBC_USER = getProperty("tests.jdbc.username", "dp_user"); + private static final String JDBC_PASS = getProperty("tests.jdbc.password", "dp_pass"); + + private static String getProperty(final String key, final String defaultValue) { + final String value = System.getProperty(key); + return value != null && !value.isEmpty() ? value : defaultValue; + } + + private JdbcSourceCoordinationStore store; + private static final String SOURCE_ID = "test-source"; + + @BeforeEach + void setUp() throws Exception { + try (Connection conn = DriverManager.getConnection(JDBC_URL, JDBC_USER, JDBC_PASS); + Statement stmt = conn.createStatement()) { + stmt.execute("DROP TABLE IF EXISTS source_coordination"); + } + + final JdbcStoreSettings settings = new JdbcStoreSettings( + JDBC_URL, JDBC_USER, JDBC_PASS, + null, null, null, null, null); + store = new JdbcSourceCoordinationStore(settings); + store.initializeStore(); + } + + @Test + void initializeStore_creates_table() { + assertThat(store.tryCreatePartitionItem( + SOURCE_ID, "p1", SourcePartitionStatus.UNASSIGNED, 0L, null, false), is(true)); + } + + @Test + void initializeStore_is_idempotent() { + store.initializeStore(); + assertThat(store.tryCreatePartitionItem( + SOURCE_ID, "p1", SourcePartitionStatus.UNASSIGNED, 0L, null, false), is(true)); + } + + @Test + void tryCreatePartitionItem_returns_false_for_duplicate() { + assertThat(store.tryCreatePartitionItem(SOURCE_ID, "p1", SourcePartitionStatus.UNASSIGNED, 0L, null, false), is(true)); + assertThat(store.tryCreatePartitionItem(SOURCE_ID, "p1", SourcePartitionStatus.UNASSIGNED, 0L, null, false), is(false)); + } + + @Test + void getSourcePartitionItem_returns_created_item() { + store.tryCreatePartitionItem(SOURCE_ID, "p1", SourcePartitionStatus.UNASSIGNED, 0L, "{\"k\":\"v\"}", false); + + final Optional result = store.getSourcePartitionItem(SOURCE_ID, "p1"); + assertThat(result.isPresent(), is(true)); + assertThat(result.get().getSourceIdentifier(), equalTo(SOURCE_ID)); + assertThat(result.get().getSourcePartitionKey(), equalTo("p1")); + assertThat(result.get().getSourcePartitionStatus(), equalTo(SourcePartitionStatus.UNASSIGNED)); + assertThat(result.get().getPartitionProgressState(), equalTo("{\"k\":\"v\"}")); + assertThat(result.get().getClosedCount(), equalTo(0L)); + } + + @Test + void getSourcePartitionItem_returns_empty_for_nonexistent() { + assertThat(store.getSourcePartitionItem(SOURCE_ID, "nonexistent").isPresent(), is(false)); + } + + @Test + void tryUpdateSourcePartitionItem_updates_with_version_check() { + store.tryCreatePartitionItem(SOURCE_ID, "p1", SourcePartitionStatus.UNASSIGNED, 0L, null, false); + final SourcePartitionStoreItem item = store.getSourcePartitionItem(SOURCE_ID, "p1").get(); + + item.setSourcePartitionStatus(SourcePartitionStatus.ASSIGNED); + item.setPartitionOwner("node-1"); + item.setPartitionOwnershipTimeout(Instant.now().plusSeconds(600)); + store.tryUpdateSourcePartitionItem(item); + + final SourcePartitionStoreItem updated = store.getSourcePartitionItem(SOURCE_ID, "p1").get(); + assertThat(updated.getSourcePartitionStatus(), equalTo(SourcePartitionStatus.ASSIGNED)); + assertThat(updated.getPartitionOwner(), equalTo("node-1")); + assertThat(updated.getPartitionOwnershipTimeout(), is(notNullValue())); + } + + @Test + void tryUpdateSourcePartitionItem_throws_on_version_mismatch() { + store.tryCreatePartitionItem(SOURCE_ID, "p1", SourcePartitionStatus.UNASSIGNED, 0L, null, false); + final SourcePartitionStoreItem item1 = store.getSourcePartitionItem(SOURCE_ID, "p1").get(); + final SourcePartitionStoreItem item2 = store.getSourcePartitionItem(SOURCE_ID, "p1").get(); + + item1.setSourcePartitionStatus(SourcePartitionStatus.ASSIGNED); + item1.setPartitionOwnershipTimeout(Instant.now().plusSeconds(600)); + store.tryUpdateSourcePartitionItem(item1); + + item2.setSourcePartitionStatus(SourcePartitionStatus.ASSIGNED); + item2.setPartitionOwnershipTimeout(Instant.now().plusSeconds(600)); + assertThrows(PartitionUpdateException.class, () -> store.tryUpdateSourcePartitionItem(item2)); + } + + @Test + void tryAcquireAvailablePartition_acquires_unassigned() { + store.tryCreatePartitionItem(SOURCE_ID, "p1", SourcePartitionStatus.UNASSIGNED, 0L, null, false); + store.tryCreatePartitionItem(SOURCE_ID, "p2", SourcePartitionStatus.UNASSIGNED, 0L, null, false); + + final Optional acquired = store.tryAcquireAvailablePartition( + SOURCE_ID, "node-1", Duration.ofMinutes(10)); + + assertThat(acquired.isPresent(), is(true)); + assertThat(acquired.get().getSourcePartitionStatus(), equalTo(SourcePartitionStatus.ASSIGNED)); + assertThat(acquired.get().getPartitionOwner(), equalTo("node-1")); + } + + @Test + void tryAcquireAvailablePartition_returns_empty_when_none_available() { + assertThat(store.tryAcquireAvailablePartition( + SOURCE_ID, "node-1", Duration.ofMinutes(10)).isPresent(), is(false)); + } + + @Test + void tryAcquireAvailablePartition_acquires_expired_assigned() { + store.tryCreatePartitionItem(SOURCE_ID, "p1", SourcePartitionStatus.UNASSIGNED, 0L, null, false); + + final Optional first = store.tryAcquireAvailablePartition( + SOURCE_ID, "node-1", Duration.ofSeconds(2)); + assertThat(first.isPresent(), is(true)); + + await().atMost(10, TimeUnit.SECONDS).until(() -> + store.tryAcquireAvailablePartition(SOURCE_ID, "node-2", Duration.ofMinutes(10)).isPresent()); + + final SourcePartitionStoreItem reacquired = store.getSourcePartitionItem(SOURCE_ID, "p1").get(); + assertThat(reacquired.getPartitionOwner(), equalTo("node-2")); + } + + @Test + void tryAcquireAvailablePartition_acquires_closed_with_expired_reopen() { + store.tryCreatePartitionItem(SOURCE_ID, "p1", SourcePartitionStatus.UNASSIGNED, 0L, null, false); + final SourcePartitionStoreItem item = store.getSourcePartitionItem(SOURCE_ID, "p1").get(); + + item.setSourcePartitionStatus(SourcePartitionStatus.CLOSED); + item.setReOpenAt(Instant.now().minusSeconds(10)); + item.setClosedCount(1L); + store.tryUpdateSourcePartitionItem(item); + + final Optional acquired = store.tryAcquireAvailablePartition( + SOURCE_ID, "node-1", Duration.ofMinutes(10)); + assertThat(acquired.isPresent(), is(true)); + assertThat(acquired.get().getPartitionOwner(), equalTo("node-1")); + } + + @Test + void tryDeletePartitionItem_deletes_item() { + store.tryCreatePartitionItem(SOURCE_ID, "p1", SourcePartitionStatus.UNASSIGNED, 0L, null, false); + final SourcePartitionStoreItem item = store.getSourcePartitionItem(SOURCE_ID, "p1").get(); + + store.tryDeletePartitionItem(item); + + assertThat(store.getSourcePartitionItem(SOURCE_ID, "p1").isPresent(), is(false)); + } + + @Test + void queryAllSourcePartitionItems_returns_all_for_source() { + store.tryCreatePartitionItem(SOURCE_ID, "p1", SourcePartitionStatus.UNASSIGNED, 0L, null, false); + store.tryCreatePartitionItem(SOURCE_ID, "p2", SourcePartitionStatus.ASSIGNED, 0L, null, false); + store.tryCreatePartitionItem("other-source", "p3", SourcePartitionStatus.UNASSIGNED, 0L, null, false); + + final List items = store.queryAllSourcePartitionItems(SOURCE_ID); + assertThat(items.size(), equalTo(2)); + } + + @Test + void querySourcePartitionItemsByStatus_filters_correctly() { + store.tryCreatePartitionItem(SOURCE_ID, "p1", SourcePartitionStatus.UNASSIGNED, 0L, null, false); + store.tryCreatePartitionItem(SOURCE_ID, "p2", SourcePartitionStatus.ASSIGNED, 0L, null, false); + + final List items = store.querySourcePartitionItemsByStatus( + SOURCE_ID, SourcePartitionStatus.UNASSIGNED, Instant.EPOCH.toString()); + assertThat(items.size(), equalTo(1)); + assertThat(items.get(0).getSourcePartitionKey(), equalTo("p1")); + } + + @Test + void deleteExpiredItems_deletes_completed_partitions_past_ttl() throws Exception { + try (Connection conn = DriverManager.getConnection(JDBC_URL, JDBC_USER, JDBC_PASS); + Statement stmt = conn.createStatement()) { + stmt.execute("DROP TABLE IF EXISTS source_coordination"); + } + + final JdbcStoreSettings ttlSettings = new JdbcStoreSettings( + JDBC_URL, JDBC_USER, JDBC_PASS, + null, null, null, Duration.ofSeconds(2), null); + final JdbcSourceCoordinationStore ttlStore = new JdbcSourceCoordinationStore(ttlSettings); + ttlStore.initializeStore(); + + ttlStore.tryCreatePartitionItem(SOURCE_ID, "p1", SourcePartitionStatus.UNASSIGNED, 0L, null, false); + final SourcePartitionStoreItem item = ttlStore.getSourcePartitionItem(SOURCE_ID, "p1").get(); + item.setSourcePartitionStatus(SourcePartitionStatus.COMPLETED); + ttlStore.tryUpdateSourcePartitionItem(item); + + assertThat(ttlStore.getSourcePartitionItem(SOURCE_ID, "p1").isPresent(), is(true)); + + await().atMost(10, TimeUnit.SECONDS).until(() -> { + ttlStore.deleteExpiredItems(); + return !ttlStore.getSourcePartitionItem(SOURCE_ID, "p1").isPresent(); + }); + } +} diff --git a/data-prepper-plugins/jdbc-source-coordination-store/src/main/java/org/opensearch/dataprepper/plugins/sourcecoordinator/jdbc/JdbcPartitionItem.java b/data-prepper-plugins/jdbc-source-coordination-store/src/main/java/org/opensearch/dataprepper/plugins/sourcecoordinator/jdbc/JdbcPartitionItem.java new file mode 100644 index 0000000000..72310095b0 --- /dev/null +++ b/data-prepper-plugins/jdbc-source-coordination-store/src/main/java/org/opensearch/dataprepper/plugins/sourcecoordinator/jdbc/JdbcPartitionItem.java @@ -0,0 +1,124 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.dataprepper.plugins.sourcecoordinator.jdbc; + +import org.opensearch.dataprepper.model.source.coordinator.SourcePartitionStatus; +import org.opensearch.dataprepper.model.source.coordinator.SourcePartitionStoreItem; + +import java.time.Instant; + +public class JdbcPartitionItem implements SourcePartitionStoreItem { + + private String sourceIdentifier; + private String sourcePartitionKey; + private String partitionOwner; + private String partitionProgressState; + private SourcePartitionStatus sourcePartitionStatus; + private Instant partitionOwnershipTimeout; + private Instant reOpenAt; + private Long closedCount; + private String partitionPriority; + private long version; + + @Override + public String getSourceIdentifier() { + return sourceIdentifier; + } + + @Override + public String getSourcePartitionKey() { + return sourcePartitionKey; + } + + @Override + public String getPartitionOwner() { + return partitionOwner; + } + + @Override + public String getPartitionProgressState() { + return partitionProgressState; + } + + @Override + public SourcePartitionStatus getSourcePartitionStatus() { + return sourcePartitionStatus; + } + + @Override + public Instant getPartitionOwnershipTimeout() { + return partitionOwnershipTimeout; + } + + @Override + public Instant getReOpenAt() { + return reOpenAt; + } + + @Override + public Long getClosedCount() { + return closedCount; + } + + public String getPartitionPriority() { + return partitionPriority; + } + + public long getVersion() { + return version; + } + + @Override + public void setSourcePartitionKey(final String sourcePartitionKey) { + this.sourcePartitionKey = sourcePartitionKey; + } + + @Override + public void setPartitionOwner(final String partitionOwner) { + this.partitionOwner = partitionOwner; + } + + @Override + public void setPartitionProgressState(final String partitionProgressState) { + this.partitionProgressState = partitionProgressState; + } + + @Override + public void setSourcePartitionStatus(final SourcePartitionStatus sourcePartitionStatus) { + this.sourcePartitionStatus = sourcePartitionStatus; + } + + @Override + public void setPartitionOwnershipTimeout(final Instant partitionOwnershipTimeout) { + this.partitionOwnershipTimeout = partitionOwnershipTimeout; + } + + @Override + public void setReOpenAt(final Instant reOpenAt) { + this.reOpenAt = reOpenAt; + } + + @Override + public void setClosedCount(final Long closedCount) { + this.closedCount = closedCount; + } + + public void setSourceIdentifier(final String sourceIdentifier) { + this.sourceIdentifier = sourceIdentifier; + } + + public void setPartitionPriority(final String partitionPriority) { + this.partitionPriority = partitionPriority; + } + + public void setVersion(final long version) { + this.version = version; + } +} diff --git a/data-prepper-plugins/jdbc-source-coordination-store/src/main/java/org/opensearch/dataprepper/plugins/sourcecoordinator/jdbc/JdbcSourceCoordinationStore.java b/data-prepper-plugins/jdbc-source-coordination-store/src/main/java/org/opensearch/dataprepper/plugins/sourcecoordinator/jdbc/JdbcSourceCoordinationStore.java new file mode 100644 index 0000000000..5cce6c7cb6 --- /dev/null +++ b/data-prepper-plugins/jdbc-source-coordination-store/src/main/java/org/opensearch/dataprepper/plugins/sourcecoordinator/jdbc/JdbcSourceCoordinationStore.java @@ -0,0 +1,473 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.dataprepper.plugins.sourcecoordinator.jdbc; + +import com.zaxxer.hikari.HikariConfig; +import com.zaxxer.hikari.HikariDataSource; +import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; +import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; +import org.opensearch.dataprepper.model.source.SourceCoordinationStore; +import org.opensearch.dataprepper.model.source.coordinator.SourcePartitionStatus; +import org.opensearch.dataprepper.model.source.coordinator.SourcePartitionStoreItem; +import org.opensearch.dataprepper.model.source.coordinator.exceptions.PartitionUpdateException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.SQLIntegrityConstraintViolationException; +import java.sql.Types; +import java.time.Duration; +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.Properties; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +@DataPrepperPlugin(name = "jdbc", + pluginType = SourceCoordinationStore.class, + pluginConfigurationType = JdbcStoreSettings.class) +public class JdbcSourceCoordinationStore implements SourceCoordinationStore, AutoCloseable { + + private static final Logger LOG = LoggerFactory.getLogger(JdbcSourceCoordinationStore.class); + private static final String UNIQUE_VIOLATION_SQLSTATE = "23505"; + private static final String DUPLICATE_RELATION_SQLSTATE = "42P07"; + private static final int MYSQL_DUPLICATE_KEY_NAME_ERROR = 1061; + private static final Duration TTL_CLEANUP_INTERVAL = Duration.ofHours(1); + + private final JdbcStoreSettings settings; + private final AtomicBoolean initialized = new AtomicBoolean(false); + private HikariDataSource dataSource; + private ScheduledExecutorService ttlExecutor; + + @DataPrepperPluginConstructor + public JdbcSourceCoordinationStore(final JdbcStoreSettings settings) { + this.settings = settings; + } + + @Override + public void initializeStore() { + if (!initialized.compareAndSet(false, true)) { + LOG.debug("JDBC coordination store already initialized, skipping"); + return; + } + final HikariConfig hikariConfig = new HikariConfig(); + hikariConfig.setJdbcUrl(settings.getUrl()); + hikariConfig.setMaximumPoolSize(settings.getMaxPoolSize()); + + final Properties props = new Properties(); + props.setProperty("user", settings.getUsername()); + props.setProperty("password", settings.getPassword()); + if (settings.getConnectionProperties() != null) { + props.putAll(settings.getConnectionProperties()); + } + hikariConfig.setDataSourceProperties(props); + + this.dataSource = createDataSource(hikariConfig); + + if (!settings.skipTableCreation()) { + createTable(); + } + + if (settings.getTtl() != null) { + ttlExecutor = Executors.newSingleThreadScheduledExecutor(r -> { + final Thread t = new Thread(r, "jdbc-coordination-ttl"); + t.setDaemon(true); + return t; + }); + ttlExecutor.scheduleAtFixedRate(this::deleteExpiredItems, + 60, TTL_CLEANUP_INTERVAL.toSeconds(), TimeUnit.SECONDS); + } + } + + HikariDataSource createDataSource(final HikariConfig hikariConfig) { + return new HikariDataSource(hikariConfig); + } + + private void createTable() { + final String tableName = settings.getTableName(); + final String createTableSql = "CREATE TABLE IF NOT EXISTS " + tableName + " (" + + "source_identifier VARCHAR(256) NOT NULL, " + + "source_partition_key VARCHAR(512) NOT NULL, " + + "partition_owner VARCHAR(256), " + + "partition_progress_state TEXT, " + + "source_partition_status VARCHAR(20) NOT NULL, " + + "partition_ownership_timeout TIMESTAMP, " + + "reopen_at TIMESTAMP, " + + "closed_count BIGINT DEFAULT 0, " + + "partition_priority VARCHAR(64), " + + "version BIGINT NOT NULL DEFAULT 0, " + + "expiration_time TIMESTAMP, " + + "PRIMARY KEY (source_identifier, source_partition_key))"; + + try (Connection conn = dataSource.getConnection()) { + try (PreparedStatement stmt = conn.prepareStatement(createTableSql)) { + stmt.execute(); + } + // CREATE INDEX IF NOT EXISTS is not supported by MySQL. + // Use plain CREATE INDEX and ignore the error if the index already exists. + try (PreparedStatement stmt = conn.prepareStatement( + "CREATE INDEX idx_source_status_priority ON " + + tableName + " (source_identifier, source_partition_status, partition_priority)")) { + stmt.execute(); + } catch (final SQLException e) { + if (!isIndexAlreadyExists(e)) { + throw e; + } + LOG.debug("Index already exists, skipping creation"); + } + LOG.info("JDBC coordination store table {} initialized", tableName); + } catch (final SQLException e) { + throw new RuntimeException("Failed to create coordination store table", e); + } + } + + @Override + public Optional getSourcePartitionItem(final String sourceIdentifier, + final String sourcePartitionKey) { + final String sql = "SELECT * FROM " + settings.getTableName() + + " WHERE source_identifier = ? AND source_partition_key = ?"; + + try (Connection conn = dataSource.getConnection(); + PreparedStatement stmt = conn.prepareStatement(sql)) { + stmt.setString(1, sourceIdentifier); + stmt.setString(2, sourcePartitionKey); + try (ResultSet rs = stmt.executeQuery()) { + final List items = mapResultSetToList(rs); + return items.isEmpty() ? Optional.empty() : Optional.of(items.get(0)); + } + } catch (final SQLException e) { + LOG.error("Failed to get partition item for {} / {}", sourceIdentifier, sourcePartitionKey, e); + return Optional.empty(); + } + } + + @Override + public List querySourcePartitionItemsByStatus(final String sourceIdentifier, + final SourcePartitionStatus status, + final String startPartitionPriority) { + final String sql = "SELECT * FROM " + settings.getTableName() + + " WHERE source_identifier = ? AND source_partition_status = ?" + + " AND partition_priority >= ?" + + " ORDER BY partition_priority"; + + try (Connection conn = dataSource.getConnection(); + PreparedStatement stmt = conn.prepareStatement(sql)) { + stmt.setString(1, sourceIdentifier); + stmt.setString(2, status.name()); + stmt.setString(3, startPartitionPriority); + try (ResultSet rs = stmt.executeQuery()) { + return mapResultSetToList(rs); + } + } catch (final SQLException e) { + LOG.error("Failed to query partitions by status for {}", sourceIdentifier, e); + return List.of(); + } + } + + @Override + public List queryAllSourcePartitionItems(final String sourceIdentifier) { + final String sql = "SELECT * FROM " + settings.getTableName() + + " WHERE source_identifier = ?"; + + try (Connection conn = dataSource.getConnection(); + PreparedStatement stmt = conn.prepareStatement(sql)) { + stmt.setString(1, sourceIdentifier); + try (ResultSet rs = stmt.executeQuery()) { + return mapResultSetToList(rs); + } + } catch (final SQLException e) { + LOG.error("Failed to query all partitions for {}", sourceIdentifier, e); + return List.of(); + } + } + + @Override + public boolean tryCreatePartitionItem(final String sourceIdentifier, + final String sourcePartitionKey, + final SourcePartitionStatus sourcePartitionStatus, + final Long closedCount, + final String partitionProgressState, + final boolean isReadOnlyItem) { + final String sql = "INSERT INTO " + settings.getTableName() + + " (source_identifier, source_partition_key, source_partition_status," + + " closed_count, partition_progress_state, partition_priority, version, expiration_time)" + + " VALUES (?, ?, ?, ?, ?, ?, 0, ?)"; + + try (Connection conn = dataSource.getConnection(); + PreparedStatement stmt = conn.prepareStatement(sql)) { + stmt.setString(1, sourceIdentifier); + stmt.setString(2, sourcePartitionKey); + stmt.setString(3, sourcePartitionStatus.name()); + stmt.setLong(4, closedCount); + stmt.setString(5, partitionProgressState); + stmt.setString(6, Instant.now().toString()); + if (!isReadOnlyItem && settings.getTtl() != null) { + stmt.setObject(7, LocalDateTime.ofInstant( + Instant.now().plus(settings.getTtl()), ZoneOffset.UTC)); + } else { + stmt.setNull(7, Types.TIMESTAMP); + } + stmt.executeUpdate(); + return true; + } catch (final SQLException e) { + if (isConstraintViolation(e)) { + return false; + } + LOG.error("Failed to create partition item {} / {}", sourceIdentifier, sourcePartitionKey, e); + return false; + } + } + + @Override + public Optional tryAcquireAvailablePartition(final String sourceIdentifier, + final String ownerId, + final Duration ownershipTimeout) { + // Step 1: ASSIGNED with expired ownership + final LocalDateTime now = LocalDateTime.ofInstant(Instant.now(), ZoneOffset.UTC); + final Optional assigned = queryAndAcquire(sourceIdentifier, ownerId, ownershipTimeout, + "SELECT * FROM " + settings.getTableName() + + " WHERE source_identifier = ? AND source_partition_status = 'ASSIGNED'" + + " AND partition_ownership_timeout < ?" + + " ORDER BY partition_priority LIMIT 1", now); + if (assigned.isPresent()) { + return assigned; + } + + // Step 2: UNASSIGNED + final Optional unassigned = queryAndAcquire(sourceIdentifier, ownerId, ownershipTimeout, + "SELECT * FROM " + settings.getTableName() + + " WHERE source_identifier = ? AND source_partition_status = 'UNASSIGNED'" + + " ORDER BY partition_priority LIMIT 5", null); + if (unassigned.isPresent()) { + return unassigned; + } + + // Step 3: CLOSED with expired reopen_at + return queryAndAcquire(sourceIdentifier, ownerId, ownershipTimeout, + "SELECT * FROM " + settings.getTableName() + + " WHERE source_identifier = ? AND source_partition_status = 'CLOSED'" + + " AND reopen_at < ?" + + " ORDER BY partition_priority LIMIT 1", now); + } + + private Optional queryAndAcquire(final String sourceIdentifier, + final String ownerId, + final Duration ownershipTimeout, + final String querySql, + final LocalDateTime timeParam) { + try (Connection conn = dataSource.getConnection(); + PreparedStatement stmt = conn.prepareStatement(querySql)) { + stmt.setString(1, sourceIdentifier); + if (timeParam != null) { + stmt.setObject(2, timeParam); + } + try (ResultSet rs = stmt.executeQuery()) { + while (rs.next()) { + final JdbcPartitionItem item = mapResultSet(rs); + final Instant timeout = Instant.now().plus(ownershipTimeout); + item.setPartitionOwner(ownerId); + item.setPartitionOwnershipTimeout(timeout); + item.setSourcePartitionStatus(SourcePartitionStatus.ASSIGNED); + item.setPartitionPriority(timeout.toString()); + try { + tryUpdateItem(item); + return Optional.of(item); + } catch (final PartitionUpdateException e) { + // Another node acquired this row, try next + } + } + } + } catch (final SQLException e) { + LOG.error("Failed to acquire partition for {}", sourceIdentifier, e); + } + return Optional.empty(); + } + + @Override + public void tryUpdateSourcePartitionItem(final SourcePartitionStoreItem updateItem) { + tryUpdateSourcePartitionItemInternal(updateItem, null); + } + + @Override + public void tryUpdateSourcePartitionItem(final SourcePartitionStoreItem updateItem, + final Instant priorityForUnassignedPartitions) { + tryUpdateSourcePartitionItemInternal(updateItem, priorityForUnassignedPartitions); + } + + private void tryUpdateSourcePartitionItemInternal(final SourcePartitionStoreItem updateItem, + final Instant priorityOverride) { + final JdbcPartitionItem item = (JdbcPartitionItem) updateItem; + + if (SourcePartitionStatus.CLOSED.equals(item.getSourcePartitionStatus())) { + item.setPartitionPriority(item.getReOpenAt() != null ? item.getReOpenAt().toString() : null); + } + if (SourcePartitionStatus.ASSIGNED.equals(item.getSourcePartitionStatus())) { + item.setPartitionPriority(item.getPartitionOwnershipTimeout() != null + ? item.getPartitionOwnershipTimeout().toString() : null); + } + if (priorityOverride != null && SourcePartitionStatus.UNASSIGNED.equals(item.getSourcePartitionStatus())) { + item.setPartitionPriority(priorityOverride.toString()); + } + + tryUpdateItem(item); + } + + private void tryUpdateItem(final JdbcPartitionItem item) { + final String sql = "UPDATE " + settings.getTableName() + + " SET partition_owner = ?, partition_progress_state = ?," + + " source_partition_status = ?, partition_ownership_timeout = ?," + + " reopen_at = ?, closed_count = ?, partition_priority = ?," + + " version = version + 1, expiration_time = ?" + + " WHERE source_identifier = ? AND source_partition_key = ? AND version = ?"; + + try (Connection conn = dataSource.getConnection(); + PreparedStatement stmt = conn.prepareStatement(sql)) { + int idx = 1; + stmt.setString(idx++, item.getPartitionOwner()); + stmt.setString(idx++, item.getPartitionProgressState()); + stmt.setString(idx++, item.getSourcePartitionStatus().name()); + setInstant(stmt, idx++, item.getPartitionOwnershipTimeout()); + setInstant(stmt, idx++, item.getReOpenAt()); + stmt.setLong(idx++, item.getClosedCount() != null ? item.getClosedCount() : 0); + stmt.setString(idx++, item.getPartitionPriority()); + if (settings.getTtl() != null) { + setInstant(stmt, idx++, Instant.now().plus(settings.getTtl())); + } else { + setInstant(stmt, idx++, null); + } + stmt.setString(idx++, item.getSourceIdentifier()); + stmt.setString(idx++, item.getSourcePartitionKey()); + stmt.setLong(idx++, item.getVersion()); + + final int updated = stmt.executeUpdate(); + if (updated == 0) { + throw new PartitionUpdateException( + String.format("Failed to update partition %s. Version mismatch (expected %d).", + item.getSourcePartitionKey(), item.getVersion()), null); + } + item.setVersion(item.getVersion() + 1); + } catch (final PartitionUpdateException e) { + throw e; + } catch (final SQLException e) { + throw new PartitionUpdateException( + String.format("Failed to update partition %s", item.getSourcePartitionKey()), e); + } + } + + @Override + public void tryDeletePartitionItem(final SourcePartitionStoreItem deleteItem) { + final JdbcPartitionItem item = (JdbcPartitionItem) deleteItem; + final String sql = "DELETE FROM " + settings.getTableName() + + " WHERE source_identifier = ? AND source_partition_key = ? AND version = ?"; + + try (Connection conn = dataSource.getConnection(); + PreparedStatement stmt = conn.prepareStatement(sql)) { + stmt.setString(1, item.getSourceIdentifier()); + stmt.setString(2, item.getSourcePartitionKey()); + stmt.setLong(3, item.getVersion()); + final int deleted = stmt.executeUpdate(); + if (deleted == 0) { + throw new PartitionUpdateException( + String.format("Failed to delete partition %s. Version mismatch.", item.getSourcePartitionKey()), null); + } + } catch (final PartitionUpdateException e) { + throw e; + } catch (final SQLException e) { + throw new PartitionUpdateException( + String.format("Failed to delete partition %s", item.getSourcePartitionKey()), e); + } + } + + @Override + public void close() { + if (ttlExecutor != null) { + ttlExecutor.shutdownNow(); + } + if (dataSource != null) { + dataSource.close(); + } + } + + void deleteExpiredItems() { + final String sql = "DELETE FROM " + settings.getTableName() + + " WHERE expiration_time < ?"; + try (Connection conn = dataSource.getConnection(); + PreparedStatement stmt = conn.prepareStatement(sql)) { + stmt.setObject(1, LocalDateTime.ofInstant(Instant.now(), ZoneOffset.UTC)); + final int deleted = stmt.executeUpdate(); + if (deleted > 0) { + LOG.debug("Deleted {} expired partition items", deleted); + } + } catch (final SQLException e) { + LOG.warn("Failed to delete expired partition items", e); + } + } + + private JdbcPartitionItem mapResultSet(final ResultSet rs) throws SQLException { + final JdbcPartitionItem item = new JdbcPartitionItem(); + item.setSourceIdentifier(rs.getString("source_identifier")); + item.setSourcePartitionKey(rs.getString("source_partition_key")); + item.setPartitionOwner(rs.getString("partition_owner")); + item.setPartitionProgressState(rs.getString("partition_progress_state")); + item.setSourcePartitionStatus(SourcePartitionStatus.valueOf(rs.getString("source_partition_status"))); + item.setPartitionOwnershipTimeout(getInstant(rs, "partition_ownership_timeout")); + item.setReOpenAt(getInstant(rs, "reopen_at")); + item.setClosedCount(rs.getLong("closed_count")); + item.setPartitionPriority(rs.getString("partition_priority")); + item.setVersion(rs.getLong("version")); + return item; + } + + private List mapResultSetToList(final ResultSet rs) throws SQLException { + final List items = new ArrayList<>(); + while (rs.next()) { + items.add(mapResultSet(rs)); + } + return items; + } + + private static void setInstant(final PreparedStatement stmt, final int index, final Instant instant) + throws SQLException { + if (instant != null) { + stmt.setObject(index, LocalDateTime.ofInstant(instant, ZoneOffset.UTC)); + } else { + stmt.setNull(index, Types.TIMESTAMP); + } + } + + private static Instant getInstant(final ResultSet rs, final String column) throws SQLException { + final LocalDateTime ldt = rs.getObject(column, LocalDateTime.class); + return ldt != null ? ldt.toInstant(ZoneOffset.UTC) : null; + } + + // PostgreSQL: SQLSTATE 42P07 (duplicate_table, also covers indexes/relations) + // MySQL: Error 1061 (ER_DUP_KEYNAME) + private static boolean isIndexAlreadyExists(final SQLException ex) { + return DUPLICATE_RELATION_SQLSTATE.equals(ex.getSQLState()) + || ex.getErrorCode() == MYSQL_DUPLICATE_KEY_NAME_ERROR; + } + + // MySQL throws SQLIntegrityConstraintViolationException (JDBC standard) on PK violation. + // PostgreSQL does not throw that subclass, but sets SQLSTATE 23505 (unique_violation, + // defined in ISO/IEC 9075). Both cases indicate the row already exists. + private static boolean isConstraintViolation(final SQLException ex) { + return ex instanceof SQLIntegrityConstraintViolationException + || UNIQUE_VIOLATION_SQLSTATE.equals(ex.getSQLState()); + } +} diff --git a/data-prepper-plugins/jdbc-source-coordination-store/src/main/java/org/opensearch/dataprepper/plugins/sourcecoordinator/jdbc/JdbcStoreSettings.java b/data-prepper-plugins/jdbc-source-coordination-store/src/main/java/org/opensearch/dataprepper/plugins/sourcecoordinator/jdbc/JdbcStoreSettings.java new file mode 100644 index 0000000000..fa3538fe5c --- /dev/null +++ b/data-prepper-plugins/jdbc-source-coordination-store/src/main/java/org/opensearch/dataprepper/plugins/sourcecoordinator/jdbc/JdbcStoreSettings.java @@ -0,0 +1,88 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.dataprepper.plugins.sourcecoordinator.jdbc; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.time.Duration; +import java.util.Map; +import java.util.Objects; + +public class JdbcStoreSettings { + + private static final String DEFAULT_TABLE_NAME = "source_coordination"; + private static final int DEFAULT_MAX_POOL_SIZE = 5; + + private final String url; + private final String username; + private final String password; + private final String tableName; + private final boolean skipTableCreation; + private final int maxPoolSize; + private final Duration ttl; + private final Map connectionProperties; + + @JsonCreator + public JdbcStoreSettings( + @JsonProperty("url") final String url, + @JsonProperty("username") final String username, + @JsonProperty("password") final String password, + @JsonProperty("table_name") final String tableName, + @JsonProperty("skip_table_creation") final Boolean skipTableCreation, + @JsonProperty("max_pool_size") final Integer maxPoolSize, + @JsonProperty("ttl") final Duration ttl, + @JsonProperty("connection_properties") final Map connectionProperties) { + Objects.requireNonNull(url, "url is required for JDBC store settings"); + Objects.requireNonNull(username, "username is required for JDBC store settings"); + Objects.requireNonNull(password, "password is required for JDBC store settings"); + + this.url = url; + this.username = username; + this.password = password; + this.tableName = tableName != null ? tableName : DEFAULT_TABLE_NAME; + this.skipTableCreation = skipTableCreation != null ? skipTableCreation : false; + this.maxPoolSize = maxPoolSize != null ? maxPoolSize : DEFAULT_MAX_POOL_SIZE; + this.ttl = ttl; + this.connectionProperties = connectionProperties; + } + + public String getUrl() { + return url; + } + + public String getUsername() { + return username; + } + + public String getPassword() { + return password; + } + + public String getTableName() { + return tableName; + } + + public boolean skipTableCreation() { + return skipTableCreation; + } + + public int getMaxPoolSize() { + return maxPoolSize; + } + + public Duration getTtl() { + return ttl; + } + + public Map getConnectionProperties() { + return connectionProperties; + } +} diff --git a/data-prepper-plugins/jdbc-source-coordination-store/src/test/java/org/opensearch/dataprepper/plugins/sourcecoordinator/jdbc/JdbcPartitionItemTest.java b/data-prepper-plugins/jdbc-source-coordination-store/src/test/java/org/opensearch/dataprepper/plugins/sourcecoordinator/jdbc/JdbcPartitionItemTest.java new file mode 100644 index 0000000000..932249744b --- /dev/null +++ b/data-prepper-plugins/jdbc-source-coordination-store/src/test/java/org/opensearch/dataprepper/plugins/sourcecoordinator/jdbc/JdbcPartitionItemTest.java @@ -0,0 +1,67 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.dataprepper.plugins.sourcecoordinator.jdbc; + +import org.junit.jupiter.api.Test; +import org.opensearch.dataprepper.model.source.coordinator.SourcePartitionStatus; + +import java.time.Instant; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.nullValue; + +class JdbcPartitionItemTest { + + @Test + void getters_and_setters() { + final JdbcPartitionItem item = new JdbcPartitionItem(); + final Instant now = Instant.now(); + + item.setSourceIdentifier("source-1"); + item.setSourcePartitionKey("partition-1"); + item.setPartitionOwner("owner-1"); + item.setPartitionProgressState("{\"offset\":100}"); + item.setSourcePartitionStatus(SourcePartitionStatus.ASSIGNED); + item.setPartitionOwnershipTimeout(now); + item.setReOpenAt(now.plusSeconds(60)); + item.setClosedCount(3L); + item.setPartitionPriority(now.toString()); + item.setVersion(5); + + assertThat(item.getSourceIdentifier(), equalTo("source-1")); + assertThat(item.getSourcePartitionKey(), equalTo("partition-1")); + assertThat(item.getPartitionOwner(), equalTo("owner-1")); + assertThat(item.getPartitionProgressState(), equalTo("{\"offset\":100}")); + assertThat(item.getSourcePartitionStatus(), equalTo(SourcePartitionStatus.ASSIGNED)); + assertThat(item.getPartitionOwnershipTimeout(), equalTo(now)); + assertThat(item.getReOpenAt(), equalTo(now.plusSeconds(60))); + assertThat(item.getClosedCount(), equalTo(3L)); + assertThat(item.getPartitionPriority(), equalTo(now.toString())); + assertThat(item.getVersion(), equalTo(5L)); + } + + @Test + void defaults_are_null_or_zero() { + final JdbcPartitionItem item = new JdbcPartitionItem(); + + assertThat(item.getSourceIdentifier(), is(nullValue())); + assertThat(item.getSourcePartitionKey(), is(nullValue())); + assertThat(item.getPartitionOwner(), is(nullValue())); + assertThat(item.getPartitionProgressState(), is(nullValue())); + assertThat(item.getSourcePartitionStatus(), is(nullValue())); + assertThat(item.getPartitionOwnershipTimeout(), is(nullValue())); + assertThat(item.getReOpenAt(), is(nullValue())); + assertThat(item.getClosedCount(), is(nullValue())); + assertThat(item.getPartitionPriority(), is(nullValue())); + assertThat(item.getVersion(), equalTo(0L)); + } +} diff --git a/data-prepper-plugins/jdbc-source-coordination-store/src/test/java/org/opensearch/dataprepper/plugins/sourcecoordinator/jdbc/JdbcSourceCoordinationStoreTest.java b/data-prepper-plugins/jdbc-source-coordination-store/src/test/java/org/opensearch/dataprepper/plugins/sourcecoordinator/jdbc/JdbcSourceCoordinationStoreTest.java new file mode 100644 index 0000000000..76b4ce08b5 --- /dev/null +++ b/data-prepper-plugins/jdbc-source-coordination-store/src/test/java/org/opensearch/dataprepper/plugins/sourcecoordinator/jdbc/JdbcSourceCoordinationStoreTest.java @@ -0,0 +1,735 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.dataprepper.plugins.sourcecoordinator.jdbc; + +import com.zaxxer.hikari.HikariConfig; +import com.zaxxer.hikari.HikariDataSource; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.model.source.coordinator.SourcePartitionStatus; +import org.opensearch.dataprepper.model.source.coordinator.SourcePartitionStoreItem; +import org.opensearch.dataprepper.model.source.coordinator.exceptions.PartitionUpdateException; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.SQLIntegrityConstraintViolationException; +import java.sql.Types; +import java.time.Duration; +import java.time.Instant; +import java.time.LocalDateTime; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.atLeast; +import static org.mockito.Mockito.lenient; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +class JdbcSourceCoordinationStoreTest { + + @Mock + private Connection connection; + + @Mock + private PreparedStatement preparedStatement; + + @Mock + private ResultSet resultSet; + + @Mock + private HikariDataSource mockDataSource; + + private JdbcStoreSettings settings; + private JdbcSourceCoordinationStore store; + + @BeforeEach + void setUp() throws SQLException { + settings = new JdbcStoreSettings( + "jdbc:postgresql://localhost/test", "user", "pass", + null, true, null, null, null); + lenient().when(mockDataSource.getConnection()).thenReturn(connection); + lenient().when(connection.prepareStatement(anyString())).thenReturn(preparedStatement); + } + + @AfterEach + void tearDown() { + if (store != null) { + store.close(); + } + } + + private void createInitializedStore() { + store = new JdbcSourceCoordinationStore(settings) { + @Override + HikariDataSource createDataSource(final HikariConfig hikariConfig) { + return mockDataSource; + } + }; + store.initializeStore(); + } + + @Test + void initializeStore_second_call_is_skipped() { + final int[] createDataSourceCallCount = {0}; + store = new JdbcSourceCoordinationStore(settings) { + @Override + HikariDataSource createDataSource(final HikariConfig hikariConfig) { + createDataSourceCallCount[0]++; + return mockDataSource; + } + }; + store.initializeStore(); + store.initializeStore(); + + assertThat(createDataSourceCallCount[0], equalTo(1)); + } + + @Test + void getSourcePartitionItem_returns_item_when_found() throws Exception { + createInitializedStore(); + + when(resultSet.next()).thenReturn(true, false); + mockResultSetRow(); + when(preparedStatement.executeQuery()).thenReturn(resultSet); + + final Optional result = store.getSourcePartitionItem("source-1", "partition-1"); + + assertThat(result.isPresent(), is(true)); + assertThat(result.get().getSourceIdentifier(), equalTo("source-1")); + } + + @Test + void getSourcePartitionItem_returns_empty_when_not_found() throws Exception { + createInitializedStore(); + + when(resultSet.next()).thenReturn(false); + when(preparedStatement.executeQuery()).thenReturn(resultSet); + + final Optional result = store.getSourcePartitionItem("source-1", "nonexistent"); + + assertThat(result.isPresent(), is(false)); + } + + @Test + void getSourcePartitionItem_returns_empty_on_sql_exception() throws Exception { + createInitializedStore(); + + when(preparedStatement.executeQuery()).thenThrow(new SQLException("connection error")); + + final Optional result = store.getSourcePartitionItem("source-1", "partition-1"); + + assertThat(result.isPresent(), is(false)); + } + + @Test + void querySourcePartitionItemsByStatus_returns_matching_items() throws Exception { + createInitializedStore(); + + when(resultSet.next()).thenReturn(true, false); + mockResultSetRow(); + when(preparedStatement.executeQuery()).thenReturn(resultSet); + + final List result = store.querySourcePartitionItemsByStatus( + "source-1", SourcePartitionStatus.UNASSIGNED, Instant.EPOCH.toString()); + + assertThat(result.size(), equalTo(1)); + assertThat(result.get(0).getSourceIdentifier(), equalTo("source-1")); + assertThat(result.get(0).getSourcePartitionStatus(), equalTo(SourcePartitionStatus.UNASSIGNED)); + } + + @Test + void querySourcePartitionItemsByStatus_returns_empty_on_sql_exception() throws Exception { + createInitializedStore(); + + when(preparedStatement.executeQuery()).thenThrow(new SQLException("error")); + + final List result = store.querySourcePartitionItemsByStatus( + "source-1", SourcePartitionStatus.UNASSIGNED, Instant.EPOCH.toString()); + + assertThat(result, is(empty())); + } + + @Test + void queryAllSourcePartitionItems_returns_items() throws Exception { + createInitializedStore(); + + when(resultSet.next()).thenReturn(true, true, false); + mockResultSetRow(); + when(preparedStatement.executeQuery()).thenReturn(resultSet); + + final List result = store.queryAllSourcePartitionItems("source-1"); + + assertThat(result.size(), equalTo(2)); + assertThat(result.get(0).getSourceIdentifier(), equalTo("source-1")); + } + + @Test + void queryAllSourcePartitionItems_returns_empty_on_sql_exception() throws Exception { + createInitializedStore(); + + when(preparedStatement.executeQuery()).thenThrow(new SQLException("error")); + + final List result = store.queryAllSourcePartitionItems("source-1"); + + assertThat(result, is(empty())); + } + + @Test + void tryCreatePartitionItem_returns_true_on_success() throws Exception { + createInitializedStore(); + + when(preparedStatement.executeUpdate()).thenReturn(1); + + final boolean result = store.tryCreatePartitionItem( + "source-1", "partition-1", SourcePartitionStatus.UNASSIGNED, 0L, null, false); + + assertThat(result, is(true)); + } + + @Test + void tryCreatePartitionItem_returns_false_on_constraint_violation_sqlstate() throws Exception { + createInitializedStore(); + + when(preparedStatement.executeUpdate()).thenThrow(new SQLException("duplicate", "23505")); + + final boolean result = store.tryCreatePartitionItem( + "source-1", "partition-1", SourcePartitionStatus.UNASSIGNED, 0L, null, false); + + assertThat(result, is(false)); + } + + @Test + void tryCreatePartitionItem_returns_false_on_integrity_constraint_violation() throws Exception { + createInitializedStore(); + + when(preparedStatement.executeUpdate()).thenThrow(new SQLIntegrityConstraintViolationException("duplicate")); + + final boolean result = store.tryCreatePartitionItem( + "source-1", "partition-1", SourcePartitionStatus.UNASSIGNED, 0L, null, false); + + assertThat(result, is(false)); + } + + @Test + void tryCreatePartitionItem_with_ttl_sets_expiration_time() throws Exception { + settings = new JdbcStoreSettings( + "jdbc:postgresql://localhost/test", "user", "pass", + null, true, null, Duration.ofHours(1), null); + createInitializedStore(); + + when(preparedStatement.executeUpdate()).thenReturn(1); + + final boolean result = store.tryCreatePartitionItem( + "source-1", "partition-1", SourcePartitionStatus.UNASSIGNED, 0L, null, false); + + assertThat(result, is(true)); + verify(preparedStatement, never()).setNull(7, Types.TIMESTAMP); + } + + @Test + void tryCreatePartitionItem_readonly_does_not_set_expiration_time() throws Exception { + settings = new JdbcStoreSettings( + "jdbc:postgresql://localhost/test", "user", "pass", + null, true, null, Duration.ofHours(1), null); + createInitializedStore(); + + when(preparedStatement.executeUpdate()).thenReturn(1); + + final boolean result = store.tryCreatePartitionItem( + "source-1", "partition-1", SourcePartitionStatus.UNASSIGNED, 0L, null, true); + + assertThat(result, is(true)); + verify(preparedStatement).setNull(7, Types.TIMESTAMP); + } + + @Test + void tryUpdateSourcePartitionItem_succeeds_when_version_matches() throws Exception { + createInitializedStore(); + + when(preparedStatement.executeUpdate()).thenReturn(1); + + final JdbcPartitionItem item = createTestItem("source-1", "partition-1", SourcePartitionStatus.ASSIGNED, 0); + item.setPartitionOwnershipTimeout(Instant.now().plusSeconds(600)); + + store.tryUpdateSourcePartitionItem(item); + + assertThat(item.getVersion(), equalTo(1L)); + } + + @Test + void tryUpdateSourcePartitionItem_throws_on_version_mismatch() throws Exception { + createInitializedStore(); + + when(preparedStatement.executeUpdate()).thenReturn(0); + + final JdbcPartitionItem item = createTestItem("source-1", "partition-1", SourcePartitionStatus.ASSIGNED, 0); + + assertThrows(PartitionUpdateException.class, () -> store.tryUpdateSourcePartitionItem(item)); + } + + @Test + void tryUpdateSourcePartitionItem_throws_on_sql_exception() throws Exception { + createInitializedStore(); + + when(preparedStatement.executeUpdate()).thenThrow(new SQLException("error")); + + final JdbcPartitionItem item = createTestItem("source-1", "partition-1", SourcePartitionStatus.ASSIGNED, 0); + + assertThrows(PartitionUpdateException.class, () -> store.tryUpdateSourcePartitionItem(item)); + } + + @Test + void tryUpdateSourcePartitionItem_sets_priority_for_closed_status() throws Exception { + createInitializedStore(); + + when(preparedStatement.executeUpdate()).thenReturn(1); + + final Instant reopenAt = Instant.now().plusSeconds(300); + final JdbcPartitionItem item = createTestItem("source-1", "partition-1", SourcePartitionStatus.CLOSED, 0); + item.setReOpenAt(reopenAt); + + store.tryUpdateSourcePartitionItem(item); + + assertThat(item.getPartitionPriority(), equalTo(reopenAt.toString())); + } + + @Test + void tryUpdateSourcePartitionItem_with_priority_override_for_unassigned() throws Exception { + createInitializedStore(); + + when(preparedStatement.executeUpdate()).thenReturn(1); + + final Instant priorityOverride = Instant.now(); + final JdbcPartitionItem item = createTestItem("source-1", "partition-1", SourcePartitionStatus.UNASSIGNED, 0); + + store.tryUpdateSourcePartitionItem(item, priorityOverride); + + assertThat(item.getPartitionPriority(), equalTo(priorityOverride.toString())); + } + + @Test + void tryDeletePartitionItem_succeeds_when_version_matches() throws Exception { + createInitializedStore(); + + when(preparedStatement.executeUpdate()).thenReturn(1); + + final JdbcPartitionItem item = createTestItem("source-1", "partition-1", SourcePartitionStatus.UNASSIGNED, 0); + + store.tryDeletePartitionItem(item); + + verify(preparedStatement).executeUpdate(); + } + + @Test + void tryDeletePartitionItem_throws_on_version_mismatch() throws Exception { + createInitializedStore(); + + when(preparedStatement.executeUpdate()).thenReturn(0); + + final JdbcPartitionItem item = createTestItem("source-1", "partition-1", SourcePartitionStatus.UNASSIGNED, 0); + + assertThrows(PartitionUpdateException.class, () -> store.tryDeletePartitionItem(item)); + } + + @Test + void tryDeletePartitionItem_throws_on_sql_exception() throws Exception { + createInitializedStore(); + + when(preparedStatement.executeUpdate()).thenThrow(new SQLException("error")); + + final JdbcPartitionItem item = createTestItem("source-1", "partition-1", SourcePartitionStatus.UNASSIGNED, 0); + + assertThrows(PartitionUpdateException.class, () -> store.tryDeletePartitionItem(item)); + } + + @Test + void tryAcquireAvailablePartition_returns_empty_when_no_rows() throws Exception { + createInitializedStore(); + + when(resultSet.next()).thenReturn(false); + when(preparedStatement.executeQuery()).thenReturn(resultSet); + + final Optional result = store.tryAcquireAvailablePartition( + "source-1", "node-1", Duration.ofMinutes(10)); + + assertThat(result.isPresent(), is(false)); + } + + @Test + void tryAcquireAvailablePartition_acquires_available_row() throws Exception { + createInitializedStore(); + + final ResultSet emptyRs = mock(ResultSet.class); + when(emptyRs.next()).thenReturn(false); + + final ResultSet unassignedRs = mock(ResultSet.class); + when(unassignedRs.next()).thenReturn(true, false); + mockResultSetOnRs(unassignedRs); + + final PreparedStatement stmt1 = mock(PreparedStatement.class); + when(stmt1.executeQuery()).thenReturn(emptyRs); + + final PreparedStatement stmt2 = mock(PreparedStatement.class); + when(stmt2.executeQuery()).thenReturn(unassignedRs); + + final PreparedStatement updateStmt = mock(PreparedStatement.class); + when(updateStmt.executeUpdate()).thenReturn(1); + + when(connection.prepareStatement(anyString())) + .thenReturn(stmt1) + .thenReturn(stmt2) + .thenReturn(updateStmt); + + final Optional result = store.tryAcquireAvailablePartition( + "source-1", "node-1", Duration.ofMinutes(10)); + + assertThat(result.isPresent(), is(true)); + assertThat(result.get().getPartitionOwner(), equalTo("node-1")); + assertThat(result.get().getSourcePartitionStatus(), equalTo(SourcePartitionStatus.ASSIGNED)); + } + + @Test + void tryAcquireAvailablePartition_returns_empty_on_sql_exception() throws Exception { + createInitializedStore(); + + when(connection.prepareStatement(anyString())).thenThrow(new SQLException("error")); + + final Optional result = store.tryAcquireAvailablePartition( + "source-1", "node-1", Duration.ofMinutes(10)); + + assertThat(result.isPresent(), is(false)); + } + + @Test + void deleteExpiredItems_executes_delete() throws Exception { + createInitializedStore(); + + when(preparedStatement.executeUpdate()).thenReturn(3); + + store.deleteExpiredItems(); + + verify(preparedStatement).executeUpdate(); + } + + @Test + void deleteExpiredItems_handles_sql_exception() throws Exception { + createInitializedStore(); + + when(preparedStatement.executeUpdate()).thenThrow(new SQLException("error")); + + store.deleteExpiredItems(); + + verify(preparedStatement).executeUpdate(); + } + + @Test + void close_shuts_down_data_source() { + createInitializedStore(); + + store.close(); + + verify(mockDataSource).close(); + } + + @Test + void close_shuts_down_ttl_executor() { + settings = new JdbcStoreSettings( + "jdbc:postgresql://localhost/test", "user", "pass", + null, true, null, Duration.ofHours(1), null); + createInitializedStore(); + + store.close(); + + verify(mockDataSource).close(); + // ttlExecutor is created internally when TTL is configured. + // We verify indirectly that close() does not throw when ttlExecutor is non-null. + } + + @Test + void close_handles_null_fields() { + store = new JdbcSourceCoordinationStore(settings); + store.close(); + } + + @Test + void initializeStore_creates_table_when_skip_is_false() throws Exception { + settings = new JdbcStoreSettings( + "jdbc:postgresql://localhost/test", "user", "pass", + null, false, null, null, null); + createInitializedStore(); + + // createTable calls prepareStatement for CREATE TABLE and CREATE INDEX + verify(connection, atLeast(2)).prepareStatement(anyString()); + } + + @Test + void initializeStore_creates_table_handles_existing_index() throws Exception { + settings = new JdbcStoreSettings( + "jdbc:postgresql://localhost/test", "user", "pass", + null, false, null, null, null); + + final PreparedStatement createTableStmt = mock(PreparedStatement.class); + final PreparedStatement createIndexStmt = mock(PreparedStatement.class); + when(createIndexStmt.execute()).thenThrow(new SQLException("already exists", "42P07")); + + when(connection.prepareStatement(anyString())) + .thenReturn(createTableStmt) + .thenReturn(createIndexStmt); + + createInitializedStore(); + + assertThat(store, is(notNullValue())); + } + + @Test + void initializeStore_with_connection_properties() { + settings = new JdbcStoreSettings( + "jdbc:postgresql://localhost/test", "user", "pass", + null, true, null, null, Map.of("ssl", "true")); + createInitializedStore(); + + assertThat(store, is(notNullValue())); + } + + @Test + void tryUpdateSourcePartitionItem_with_ttl_sets_expiration() throws Exception { + settings = new JdbcStoreSettings( + "jdbc:postgresql://localhost/test", "user", "pass", + null, true, null, Duration.ofHours(1), null); + createInitializedStore(); + + when(preparedStatement.executeUpdate()).thenReturn(1); + + final JdbcPartitionItem item = createTestItem("source-1", "partition-1", SourcePartitionStatus.ASSIGNED, 0); + item.setPartitionOwnershipTimeout(Instant.now().plusSeconds(600)); + + store.tryUpdateSourcePartitionItem(item); + + assertThat(item.getVersion(), equalTo(1L)); + } + + @Test + void tryAcquireAvailablePartition_tries_next_on_version_conflict() throws Exception { + createInitializedStore(); + + final ResultSet emptyRs = mock(ResultSet.class); + when(emptyRs.next()).thenReturn(false); + + final ResultSet unassignedRs = mock(ResultSet.class); + when(unassignedRs.next()).thenReturn(true, true, false); + mockResultSetOnRs(unassignedRs); + + final PreparedStatement stmt1 = mock(PreparedStatement.class); + when(stmt1.executeQuery()).thenReturn(emptyRs); + + final PreparedStatement stmt2 = mock(PreparedStatement.class); + when(stmt2.executeQuery()).thenReturn(unassignedRs); + + final PreparedStatement failUpdateStmt = mock(PreparedStatement.class); + when(failUpdateStmt.executeUpdate()).thenReturn(0); + + final PreparedStatement successUpdateStmt = mock(PreparedStatement.class); + when(successUpdateStmt.executeUpdate()).thenReturn(1); + + when(connection.prepareStatement(anyString())) + .thenReturn(stmt1) + .thenReturn(stmt2) + .thenReturn(failUpdateStmt) + .thenReturn(successUpdateStmt); + + final Optional result = store.tryAcquireAvailablePartition( + "source-1", "node-1", Duration.ofMinutes(10)); + + assertThat(result.isPresent(), is(true)); + } + + @Test + void createTable_throws_on_non_index_sql_exception() throws Exception { + settings = new JdbcStoreSettings( + "jdbc:postgresql://localhost/test", "user", "pass", + null, false, null, null, null); + + final PreparedStatement createTableStmt = mock(PreparedStatement.class); + final PreparedStatement createIndexStmt = mock(PreparedStatement.class); + when(createIndexStmt.execute()).thenThrow(new SQLException("unexpected error", "42000")); + + when(connection.prepareStatement(anyString())) + .thenReturn(createTableStmt) + .thenReturn(createIndexStmt); + + assertThrows(RuntimeException.class, () -> createInitializedStore()); + } + + @Test + void createTable_throws_on_create_table_failure() throws Exception { + settings = new JdbcStoreSettings( + "jdbc:postgresql://localhost/test", "user", "pass", + null, false, null, null, null); + + when(preparedStatement.execute()).thenThrow(new SQLException("connection lost")); + + assertThrows(RuntimeException.class, () -> createInitializedStore()); + } + + @Test + void createTable_handles_mysql_duplicate_index() throws Exception { + settings = new JdbcStoreSettings( + "jdbc:postgresql://localhost/test", "user", "pass", + null, false, null, null, null); + + final PreparedStatement createTableStmt = mock(PreparedStatement.class); + final PreparedStatement createIndexStmt = mock(PreparedStatement.class); + final SQLException mysqlDuplicate = new SQLException("Duplicate key name", "HY000", 1061); + when(createIndexStmt.execute()).thenThrow(mysqlDuplicate); + + when(connection.prepareStatement(anyString())) + .thenReturn(createTableStmt) + .thenReturn(createIndexStmt); + + createInitializedStore(); + } + + @Test + void tryCreatePartitionItem_returns_false_on_non_constraint_sql_exception() throws Exception { + createInitializedStore(); + + when(preparedStatement.executeUpdate()).thenThrow(new SQLException("connection lost", "08001")); + + final boolean result = store.tryCreatePartitionItem( + "source-1", "partition-1", SourcePartitionStatus.UNASSIGNED, 0L, null, false); + + assertThat(result, is(false)); + } + + @Test + void tryAcquireAvailablePartition_returns_assigned_expired() throws Exception { + createInitializedStore(); + + final ResultSet assignedRs = mock(ResultSet.class); + when(assignedRs.next()).thenReturn(true, false); + mockResultSetOnRs(assignedRs); + lenient().when(assignedRs.getString("source_partition_status")).thenReturn("ASSIGNED"); + + final PreparedStatement queryStmt = mock(PreparedStatement.class); + when(queryStmt.executeQuery()).thenReturn(assignedRs); + + final PreparedStatement updateStmt = mock(PreparedStatement.class); + when(updateStmt.executeUpdate()).thenReturn(1); + + when(connection.prepareStatement(anyString())) + .thenReturn(queryStmt) + .thenReturn(updateStmt); + + final Optional result = store.tryAcquireAvailablePartition( + "source-1", "node-1", Duration.ofMinutes(10)); + + assertThat(result.isPresent(), is(true)); + assertThat(result.get().getSourcePartitionStatus(), equalTo(SourcePartitionStatus.ASSIGNED)); + } + + @Test + void tryUpdateSourcePartitionItem_sets_null_priority_for_closed_with_null_reopen() throws Exception { + createInitializedStore(); + + when(preparedStatement.executeUpdate()).thenReturn(1); + + final JdbcPartitionItem item = createTestItem("source-1", "partition-1", SourcePartitionStatus.CLOSED, 0); + item.setReOpenAt(null); + + store.tryUpdateSourcePartitionItem(item); + + assertThat(item.getPartitionPriority(), is(nullValue())); + } + + @Test + void tryUpdateSourcePartitionItem_priority_override_ignored_for_non_unassigned() throws Exception { + createInitializedStore(); + + when(preparedStatement.executeUpdate()).thenReturn(1); + + final JdbcPartitionItem item = createTestItem("source-1", "partition-1", SourcePartitionStatus.ASSIGNED, 0); + item.setPartitionOwnershipTimeout(Instant.now().plusSeconds(600)); + + store.tryUpdateSourcePartitionItem(item, Instant.now()); + + assertThat(item.getPartitionPriority(), equalTo(item.getPartitionOwnershipTimeout().toString())); + } + + @Test + void tryUpdateSourcePartitionItem_handles_null_closed_count() throws Exception { + createInitializedStore(); + + when(preparedStatement.executeUpdate()).thenReturn(1); + + final JdbcPartitionItem item = createTestItem("source-1", "partition-1", SourcePartitionStatus.UNASSIGNED, 0); + item.setClosedCount(null); + + store.tryUpdateSourcePartitionItem(item); + + assertThat(item.getVersion(), equalTo(1L)); + } + + @Test + void deleteExpiredItems_no_items_deleted() throws Exception { + createInitializedStore(); + + when(preparedStatement.executeUpdate()).thenReturn(0); + + store.deleteExpiredItems(); + + verify(preparedStatement).executeUpdate(); + } + + private JdbcPartitionItem createTestItem(final String sourceId, final String partitionKey, + final SourcePartitionStatus status, final long version) { + final JdbcPartitionItem item = new JdbcPartitionItem(); + item.setSourceIdentifier(sourceId); + item.setSourcePartitionKey(partitionKey); + item.setSourcePartitionStatus(status); + item.setVersion(version); + item.setClosedCount(0L); + return item; + } + + private void mockResultSetRow() throws SQLException { + mockResultSetOnRs(resultSet); + } + + private void mockResultSetOnRs(final ResultSet rs) throws SQLException { + lenient().when(rs.getString("source_identifier")).thenReturn("source-1"); + lenient().when(rs.getString("source_partition_key")).thenReturn("partition-1"); + lenient().when(rs.getString("partition_owner")).thenReturn(null); + lenient().when(rs.getString("partition_progress_state")).thenReturn(null); + lenient().when(rs.getString("source_partition_status")).thenReturn("UNASSIGNED"); + lenient().when(rs.getObject("partition_ownership_timeout", LocalDateTime.class)).thenReturn(null); + lenient().when(rs.getObject("reopen_at", LocalDateTime.class)).thenReturn(null); + lenient().when(rs.getLong("closed_count")).thenReturn(0L); + lenient().when(rs.getString("partition_priority")).thenReturn(Instant.now().toString()); + lenient().when(rs.getLong("version")).thenReturn(0L); + } +} diff --git a/data-prepper-plugins/jdbc-source-coordination-store/src/test/java/org/opensearch/dataprepper/plugins/sourcecoordinator/jdbc/JdbcStoreSettingsTest.java b/data-prepper-plugins/jdbc-source-coordination-store/src/test/java/org/opensearch/dataprepper/plugins/sourcecoordinator/jdbc/JdbcStoreSettingsTest.java new file mode 100644 index 0000000000..6780066c10 --- /dev/null +++ b/data-prepper-plugins/jdbc-source-coordination-store/src/test/java/org/opensearch/dataprepper/plugins/sourcecoordinator/jdbc/JdbcStoreSettingsTest.java @@ -0,0 +1,72 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.dataprepper.plugins.sourcecoordinator.jdbc; + +import org.junit.jupiter.api.Test; + +import java.time.Duration; +import java.util.Map; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.nullValue; +import static org.junit.jupiter.api.Assertions.assertThrows; + +class JdbcStoreSettingsTest { + + @Test + void constructor_with_required_fields() { + final JdbcStoreSettings settings = new JdbcStoreSettings( + "jdbc:postgresql://localhost/db", "user", "pass", + null, null, null, null, null); + + assertThat(settings.getUrl(), equalTo("jdbc:postgresql://localhost/db")); + assertThat(settings.getUsername(), equalTo("user")); + assertThat(settings.getPassword(), equalTo("pass")); + assertThat(settings.getTableName(), equalTo("source_coordination")); + assertThat(settings.skipTableCreation(), is(false)); + assertThat(settings.getMaxPoolSize(), equalTo(5)); + assertThat(settings.getTtl(), is(nullValue())); + assertThat(settings.getConnectionProperties(), is(nullValue())); + } + + @Test + void constructor_with_all_fields() { + final JdbcStoreSettings settings = new JdbcStoreSettings( + "jdbc:mysql://localhost/db", "admin", "secret", + "custom_table", true, 10, Duration.ofHours(24), + Map.of("ssl", "true")); + + assertThat(settings.getTableName(), equalTo("custom_table")); + assertThat(settings.skipTableCreation(), is(true)); + assertThat(settings.getMaxPoolSize(), equalTo(10)); + assertThat(settings.getTtl(), equalTo(Duration.ofHours(24))); + assertThat(settings.getConnectionProperties(), equalTo(Map.of("ssl", "true"))); + } + + @Test + void constructor_requires_url() { + assertThrows(NullPointerException.class, () -> + new JdbcStoreSettings(null, "user", "pass", null, null, null, null, null)); + } + + @Test + void constructor_requires_username() { + assertThrows(NullPointerException.class, () -> + new JdbcStoreSettings("jdbc:postgresql://localhost/db", null, "pass", null, null, null, null, null)); + } + + @Test + void constructor_requires_password() { + assertThrows(NullPointerException.class, () -> + new JdbcStoreSettings("jdbc:postgresql://localhost/db", "user", null, null, null, null, null, null)); + } +} diff --git a/settings.gradle b/settings.gradle index 82053f53d9..fcd6d608a5 100644 --- a/settings.gradle +++ b/settings.gradle @@ -159,6 +159,7 @@ include 'data-prepper-plugins:trace-peer-forwarder-processor' include 'data-prepper-plugins:translate-processor' include 'data-prepper-plugins:truncate-processor' include 'data-prepper-plugins:dynamodb-source-coordination-store' +include 'data-prepper-plugins:jdbc-source-coordination-store' include 'release' include 'release:archives' include 'release:archives:linux'