diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/IdempotentDdlDataSource.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/IdempotentDdlDataSource.java new file mode 100644 index 000000000000..6b4018d290d0 --- /dev/null +++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/IdempotentDdlDataSource.java @@ -0,0 +1,112 @@ +/* + * Copyright 2021 Collate + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.openmetadata.service.governance.workflows; + +import java.io.PrintWriter; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.lang.reflect.Proxy; +import java.sql.Connection; +import java.sql.SQLException; +import java.sql.SQLFeatureNotSupportedException; +import java.sql.Statement; +import java.util.logging.Logger; +import javax.sql.DataSource; + +/** + * A {@link DataSource} wrapper that intercepts every {@link Connection} it vends and ensures + * that all {@link Statement} variants created from that connection go through + * {@link IdempotentDdlStatement}. Only used in migration context so Flowable upgrade scripts + * skip already-existing DDL objects instead of failing. + */ +final class IdempotentDdlDataSource implements DataSource { + + private final DataSource delegate; + + IdempotentDdlDataSource(DataSource delegate) { + this.delegate = delegate; + } + + @Override + public Connection getConnection() throws SQLException { + return wrapConnection(delegate.getConnection()); + } + + @Override + public Connection getConnection(String username, String password) throws SQLException { + return wrapConnection(delegate.getConnection(username, password)); + } + + private Connection wrapConnection(Connection real) { + return (Connection) + Proxy.newProxyInstance( + real.getClass().getClassLoader(), + new Class[] {Connection.class}, + (proxy, method, args) -> { + if ("createStatement".equals(method.getName())) { + Statement stmt = (Statement) invokeDelegate(method, real, args); + return new IdempotentDdlStatement(stmt, real); + } + return invokeDelegate(method, real, args); + }); + } + + /** + * Invokes a method on the real connection, unwrapping any {@link InvocationTargetException} + * so callers receive the original exception type (e.g. {@link SQLException}). + */ + private static Object invokeDelegate(Method method, Object target, Object[] args) + throws Throwable { + try { + return method.invoke(target, args); + } catch (InvocationTargetException e) { + throw e.getCause(); + } + } + + @Override + public PrintWriter getLogWriter() throws SQLException { + return delegate.getLogWriter(); + } + + @Override + public void setLogWriter(PrintWriter out) throws SQLException { + delegate.setLogWriter(out); + } + + @Override + public void setLoginTimeout(int seconds) throws SQLException { + delegate.setLoginTimeout(seconds); + } + + @Override + public int getLoginTimeout() throws SQLException { + return delegate.getLoginTimeout(); + } + + @Override + public Logger getParentLogger() throws SQLFeatureNotSupportedException { + return delegate.getParentLogger(); + } + + @Override + public T unwrap(Class iface) throws SQLException { + return delegate.unwrap(iface); + } + + @Override + public boolean isWrapperFor(Class iface) throws SQLException { + return delegate.isWrapperFor(iface); + } +} diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/IdempotentDdlStatement.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/IdempotentDdlStatement.java new file mode 100644 index 000000000000..2a23a8ee2d8e --- /dev/null +++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/IdempotentDdlStatement.java @@ -0,0 +1,382 @@ +/* + * Copyright 2021 Collate + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.openmetadata.service.governance.workflows; + +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.SQLWarning; +import java.sql.Statement; +import java.util.Locale; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import lombok.extern.slf4j.Slf4j; + +/** + * A {@link Statement} wrapper that makes Flowable DDL upgrade scripts idempotent using + * standard JDBC {@link DatabaseMetaData} pre-checks. Before executing a CREATE INDEX, + * CREATE TABLE, or ALTER TABLE ADD COLUMN statement, it checks whether the object + * already exists and skips execution if so. This allows interrupted Flowable schema + * upgrades to resume cleanly. + */ +@Slf4j +final class IdempotentDdlStatement implements Statement { + + private static final Pattern CREATE_INDEX_PATTERN = + Pattern.compile("(?i)create\\s+(?:unique\\s+)?index\\s+(\\S+)\\s+on\\s+(\\S+)\\s*\\("); + private static final Pattern CREATE_TABLE_PATTERN = + Pattern.compile("(?i)create\\s+table\\s+(?:if\\s+not\\s+exists\\s+)?(\\S+)\\s*\\("); + // Negative lookahead prevents matching SQL keywords (CONSTRAINT, PRIMARY, UNIQUE, etc.) + // as column names when the ADD clause is not a column definition. + private static final Pattern ALTER_TABLE_ADD_COLUMN_PATTERN = + Pattern.compile( + "(?i)alter\\s+table\\s+(\\S+)\\s+add\\s+(?:column\\s+)?" + + "(?!constraint\\b|primary\\b|unique\\b|foreign\\b|check\\b|index\\b|key\\b)(\\S+)\\s"); + + private final Statement delegate; + private final Connection connection; + + IdempotentDdlStatement(Statement delegate, Connection connection) { + this.delegate = delegate; + this.connection = connection; + } + + private String stripIdentifierQuotes(String identifier) { + String trimmed = identifier.trim(); + if (trimmed.length() >= 2) { + char first = trimmed.charAt(0); + char last = trimmed.charAt(trimmed.length() - 1); + if ((first == '`' && last == '`') + || (first == '"' && last == '"') + || (first == '[' && last == ']')) { + return trimmed.substring(1, trimmed.length() - 1); + } + } + return trimmed; + } + + private String extractObjectName(String identifier) { + String unquoted = stripIdentifierQuotes(identifier); + int dot = unquoted.lastIndexOf('.'); + return dot >= 0 ? stripIdentifierQuotes(unquoted.substring(dot + 1)) : unquoted; + } + + private String normalizeIdentifier(DatabaseMetaData meta, String identifier) throws SQLException { + String name = extractObjectName(identifier); + if (meta.storesLowerCaseIdentifiers()) return name.toLowerCase(Locale.ROOT); + if (meta.storesUpperCaseIdentifiers()) return name.toUpperCase(Locale.ROOT); + return name; + } + + private boolean shouldSkip(String sql) throws SQLException { + if (sql == null) return false; + DatabaseMetaData meta = connection.getMetaData(); + + Matcher indexMatcher = CREATE_INDEX_PATTERN.matcher(sql); + if (indexMatcher.find()) { + String indexName = normalizeIdentifier(meta, indexMatcher.group(1)); + String tableName = normalizeIdentifier(meta, indexMatcher.group(2)); + if (indexExists(meta, indexName, tableName)) { + LOG.info("Skipping already-existing index: {} on {}", indexName, tableName); + return true; + } + return false; + } + Matcher tableMatcher = CREATE_TABLE_PATTERN.matcher(sql); + if (tableMatcher.find()) { + String tableName = normalizeIdentifier(meta, tableMatcher.group(1)); + if (tableExists(meta, tableName)) { + LOG.info("Skipping already-existing table: {}", tableName); + return true; + } + return false; + } + Matcher alterMatcher = ALTER_TABLE_ADD_COLUMN_PATTERN.matcher(sql); + if (alterMatcher.find()) { + String tableName = normalizeIdentifier(meta, alterMatcher.group(1)); + String columnName = normalizeIdentifier(meta, alterMatcher.group(2)); + if (columnExists(meta, tableName, columnName)) { + LOG.info("Skipping already-existing column: {}.{}", tableName, columnName); + return true; + } + } + return false; + } + + @Override + public boolean execute(String sql) throws SQLException { + if (shouldSkip(sql)) return false; + return delegate.execute(sql); + } + + @Override + public boolean execute(String sql, int autoGeneratedKeys) throws SQLException { + if (shouldSkip(sql)) return false; + return delegate.execute(sql, autoGeneratedKeys); + } + + @Override + public boolean execute(String sql, int[] columnIndexes) throws SQLException { + if (shouldSkip(sql)) return false; + return delegate.execute(sql, columnIndexes); + } + + @Override + public boolean execute(String sql, String[] columnNames) throws SQLException { + if (shouldSkip(sql)) return false; + return delegate.execute(sql, columnNames); + } + + private boolean indexExists(DatabaseMetaData meta, String indexName, String tableName) + throws SQLException { + String catalog = connection.getCatalog(); + try (ResultSet rs = meta.getIndexInfo(catalog, null, tableName, false, false)) { + while (rs.next()) { + String existing = rs.getString("INDEX_NAME"); + if (existing != null && normalizeIdentifier(meta, existing).equals(indexName)) { + return true; + } + } + } + return false; + } + + private boolean tableExists(DatabaseMetaData meta, String tableName) throws SQLException { + String catalog = connection.getCatalog(); + try (ResultSet rs = meta.getTables(catalog, null, tableName, null)) { + return rs.next(); + } + } + + private boolean columnExists(DatabaseMetaData meta, String tableName, String columnName) + throws SQLException { + String catalog = connection.getCatalog(); + try (ResultSet rs = meta.getColumns(catalog, null, tableName, null)) { + while (rs.next()) { + String existing = rs.getString("COLUMN_NAME"); + if (existing != null && normalizeIdentifier(meta, existing).equals(columnName)) { + return true; + } + } + } + return false; + } + + // All remaining Statement methods delegate to the underlying statement. + + @Override + public java.sql.ResultSet executeQuery(String sql) throws SQLException { + return delegate.executeQuery(sql); + } + + @Override + public int executeUpdate(String sql) throws SQLException { + if (shouldSkip(sql)) return 0; + return delegate.executeUpdate(sql); + } + + @Override + public int executeUpdate(String sql, int autoGeneratedKeys) throws SQLException { + if (shouldSkip(sql)) return 0; + return delegate.executeUpdate(sql, autoGeneratedKeys); + } + + @Override + public int executeUpdate(String sql, int[] columnIndexes) throws SQLException { + if (shouldSkip(sql)) return 0; + return delegate.executeUpdate(sql, columnIndexes); + } + + @Override + public int executeUpdate(String sql, String[] columnNames) throws SQLException { + if (shouldSkip(sql)) return 0; + return delegate.executeUpdate(sql, columnNames); + } + + @Override + public void close() throws SQLException { + delegate.close(); + } + + @Override + public int getMaxFieldSize() throws SQLException { + return delegate.getMaxFieldSize(); + } + + @Override + public void setMaxFieldSize(int max) throws SQLException { + delegate.setMaxFieldSize(max); + } + + @Override + public int getMaxRows() throws SQLException { + return delegate.getMaxRows(); + } + + @Override + public void setMaxRows(int max) throws SQLException { + delegate.setMaxRows(max); + } + + @Override + public void setEscapeProcessing(boolean enable) throws SQLException { + delegate.setEscapeProcessing(enable); + } + + @Override + public int getQueryTimeout() throws SQLException { + return delegate.getQueryTimeout(); + } + + @Override + public void setQueryTimeout(int seconds) throws SQLException { + delegate.setQueryTimeout(seconds); + } + + @Override + public void cancel() throws SQLException { + delegate.cancel(); + } + + @Override + public SQLWarning getWarnings() throws SQLException { + return delegate.getWarnings(); + } + + @Override + public void clearWarnings() throws SQLException { + delegate.clearWarnings(); + } + + @Override + public void setCursorName(String name) throws SQLException { + delegate.setCursorName(name); + } + + @Override + public java.sql.ResultSet getResultSet() throws SQLException { + return delegate.getResultSet(); + } + + @Override + public int getUpdateCount() throws SQLException { + return delegate.getUpdateCount(); + } + + @Override + public boolean getMoreResults() throws SQLException { + return delegate.getMoreResults(); + } + + @Override + public void setFetchDirection(int direction) throws SQLException { + delegate.setFetchDirection(direction); + } + + @Override + public int getFetchDirection() throws SQLException { + return delegate.getFetchDirection(); + } + + @Override + public void setFetchSize(int rows) throws SQLException { + delegate.setFetchSize(rows); + } + + @Override + public int getFetchSize() throws SQLException { + return delegate.getFetchSize(); + } + + @Override + public int getResultSetConcurrency() throws SQLException { + return delegate.getResultSetConcurrency(); + } + + @Override + public int getResultSetType() throws SQLException { + return delegate.getResultSetType(); + } + + @Override + public void addBatch(String sql) throws SQLException { + delegate.addBatch(sql); + } + + @Override + public void clearBatch() throws SQLException { + delegate.clearBatch(); + } + + @Override + public int[] executeBatch() throws SQLException { + return delegate.executeBatch(); + } + + @Override + public Connection getConnection() throws SQLException { + return delegate.getConnection(); + } + + @Override + public boolean getMoreResults(int current) throws SQLException { + return delegate.getMoreResults(current); + } + + @Override + public java.sql.ResultSet getGeneratedKeys() throws SQLException { + return delegate.getGeneratedKeys(); + } + + @Override + public int getResultSetHoldability() throws SQLException { + return delegate.getResultSetHoldability(); + } + + @Override + public boolean isClosed() throws SQLException { + return delegate.isClosed(); + } + + @Override + public void setPoolable(boolean poolable) throws SQLException { + delegate.setPoolable(poolable); + } + + @Override + public boolean isPoolable() throws SQLException { + return delegate.isPoolable(); + } + + @Override + public void closeOnCompletion() throws SQLException { + delegate.closeOnCompletion(); + } + + @Override + public boolean isCloseOnCompletion() throws SQLException { + return delegate.isCloseOnCompletion(); + } + + @Override + public T unwrap(Class iface) throws SQLException { + return delegate.unwrap(iface); + } + + @Override + public boolean isWrapperFor(Class iface) throws SQLException { + return delegate.isWrapperFor(iface); + } +} diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowHandler.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowHandler.java index 0bc92af66066..4b2afb3b1251 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowHandler.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowHandler.java @@ -3,6 +3,8 @@ import static org.openmetadata.service.governance.workflows.WorkflowVariableHandler.getNamespacedVariableName; import static org.openmetadata.service.governance.workflows.elements.TriggerFactory.getTriggerWorkflowId; +import java.sql.DriverManager; +import java.sql.SQLException; import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; @@ -15,10 +17,12 @@ import java.util.Optional; import java.util.Set; import java.util.UUID; +import javax.sql.DataSource; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.flowable.bpmn.converter.BpmnXMLConverter; import org.flowable.common.engine.api.FlowableObjectNotFoundException; +import org.flowable.common.engine.api.FlowableWrongDbException; import org.flowable.common.engine.impl.el.DefaultExpressionManager; import org.flowable.engine.HistoryService; import org.flowable.engine.ManagementService; @@ -74,13 +78,16 @@ public class WorkflowHandler { private WorkflowHandler(OpenMetadataApplicationConfig config, boolean isMigrationContext) { this.isMigrationContext = isMigrationContext; - ProcessEngineConfiguration processEngineConfiguration = - new StandaloneProcessEngineConfiguration() - .setJdbcUrl(config.getDataSourceFactory().getUrl()) - .setJdbcUsername(config.getDataSourceFactory().getUser()) - .setJdbcPassword(config.getDataSourceFactory().getPassword()) - .setJdbcDriver(config.getDataSourceFactory().getDriverClass()) - .setDatabaseSchemaUpdate(ProcessEngineConfiguration.DB_SCHEMA_UPDATE_TRUE); + StandaloneProcessEngineConfiguration processEngineConfiguration = + new StandaloneProcessEngineConfiguration(); + processEngineConfiguration.setJdbcUrl(config.getDataSourceFactory().getUrl()); + processEngineConfiguration.setJdbcUsername(config.getDataSourceFactory().getUser()); + processEngineConfiguration.setJdbcPassword(config.getDataSourceFactory().getPassword()); + processEngineConfiguration.setJdbcDriver(config.getDataSourceFactory().getDriverClass()); + processEngineConfiguration.setDatabaseSchemaUpdate( + isMigrationContext + ? ProcessEngineConfiguration.DB_SCHEMA_UPDATE_TRUE + : ProcessEngineConfiguration.DB_SCHEMA_UPDATE_FALSE); if (ConnectionType.MYSQL.label.equals(config.getDataSourceFactory().getDriverClass())) { processEngineConfiguration.setDatabaseType(ProcessEngineConfiguration.DATABASE_TYPE_MYSQL); @@ -100,6 +107,57 @@ public void initializeExpressionMap(OpenMetadataApplicationConfig config) { config.getPipelineServiceClientConfiguration())); } + private static DataSource migrationDataSource(ProcessEngineConfiguration config) { + if (config.getDataSource() != null) { + return config.getDataSource(); + } + String url = config.getJdbcUrl(); + String user = config.getJdbcUsername(); + String password = config.getJdbcPassword(); + return new DataSource() { + @Override + public java.io.PrintWriter getLogWriter() { + return null; + } + + @Override + public void setLogWriter(java.io.PrintWriter out) {} + + @Override + public void setLoginTimeout(int seconds) {} + + @Override + public int getLoginTimeout() { + return 0; + } + + @Override + public java.util.logging.Logger getParentLogger() { + return java.util.logging.Logger.getLogger("migration"); + } + + @Override + public T unwrap(Class iface) throws SQLException { + throw new SQLException("Not a wrapper"); + } + + @Override + public boolean isWrapperFor(Class iface) { + return false; + } + + @Override + public java.sql.Connection getConnection() throws SQLException { + return DriverManager.getConnection(url, user, password); + } + + @Override + public java.sql.Connection getConnection(String u, String p) throws SQLException { + return DriverManager.getConnection(url, u, p); + } + }; + } + public void initializeNewProcessEngine( ProcessEngineConfiguration currentProcessEngineConfiguration) { ProcessEngines.destroy(); @@ -109,40 +167,44 @@ public void initializeNewProcessEngine( StandaloneProcessEngineConfiguration processEngineConfiguration = new StandaloneProcessEngineConfiguration(); - // Setting Database Configuration - processEngineConfiguration - .setJdbcUrl(currentProcessEngineConfiguration.getJdbcUrl()) - .setJdbcUsername(currentProcessEngineConfiguration.getJdbcUsername()) - .setJdbcPassword(currentProcessEngineConfiguration.getJdbcPassword()) - .setJdbcDriver(currentProcessEngineConfiguration.getJdbcDriver()) - .setDatabaseType(currentProcessEngineConfiguration.getDatabaseType()) - .setDatabaseSchemaUpdate(ProcessEngineConfiguration.DB_SCHEMA_UPDATE_TRUE); - - // Setting Async Executor Configuration - processEngineConfiguration - .setAsyncExecutorActivate(!isMigrationContext) - .setAsyncExecutorCorePoolSize(workflowSettings.getExecutorConfiguration().getCorePoolSize()) - .setAsyncExecutorMaxPoolSize(workflowSettings.getExecutorConfiguration().getMaxPoolSize()) - .setAsyncExecutorThreadPoolQueueSize( - workflowSettings.getExecutorConfiguration().getQueueSize()) - .setAsyncExecutorAsyncJobLockTimeInMillis( - workflowSettings.getExecutorConfiguration().getJobLockTimeInMillis()) - .setAsyncExecutorMaxAsyncJobsDuePerAcquisition( - workflowSettings.getExecutorConfiguration().getTasksDuePerAcquisition()) - .setAsyncExecutorDefaultAsyncJobAcquireWaitTime( - workflowSettings.getExecutorConfiguration().getAsyncJobAcquisitionInterval()) - .setAsyncExecutorDefaultTimerJobAcquireWaitTime( - workflowSettings.getExecutorConfiguration().getTimerJobAcquisitionInterval()); - - // Setting History CleanUp - disable during migration to prevent race conditions - processEngineConfiguration - .setAsyncHistoryEnabled(true) - .setEnableHistoryCleaning(!isMigrationContext) - .setCleanInstancesEndedAfter( - Duration.ofDays( - workflowSettings.getHistoryCleanUpConfiguration().getCleanAfterNumberOfDays())) - .setHistoryCleaningTimeCycleConfig( - workflowSettings.getHistoryCleanUpConfiguration().getTimeCycleConfig()); + if (isMigrationContext) { + processEngineConfiguration.setDataSource( + new IdempotentDdlDataSource(migrationDataSource(currentProcessEngineConfiguration))); + } else { + processEngineConfiguration.setJdbcUrl(currentProcessEngineConfiguration.getJdbcUrl()); + processEngineConfiguration.setJdbcUsername( + currentProcessEngineConfiguration.getJdbcUsername()); + processEngineConfiguration.setJdbcPassword( + currentProcessEngineConfiguration.getJdbcPassword()); + processEngineConfiguration.setJdbcDriver(currentProcessEngineConfiguration.getJdbcDriver()); + } + processEngineConfiguration.setDatabaseType(currentProcessEngineConfiguration.getDatabaseType()); + processEngineConfiguration.setDatabaseSchemaUpdate( + isMigrationContext + ? ProcessEngineConfiguration.DB_SCHEMA_UPDATE_TRUE + : ProcessEngineConfiguration.DB_SCHEMA_UPDATE_FALSE); + processEngineConfiguration.setAsyncExecutorActivate(!isMigrationContext); + processEngineConfiguration.setAsyncExecutorCorePoolSize( + workflowSettings.getExecutorConfiguration().getCorePoolSize()); + processEngineConfiguration.setAsyncExecutorMaxPoolSize( + workflowSettings.getExecutorConfiguration().getMaxPoolSize()); + processEngineConfiguration.setAsyncExecutorThreadPoolQueueSize( + workflowSettings.getExecutorConfiguration().getQueueSize()); + processEngineConfiguration.setAsyncExecutorAsyncJobLockTimeInMillis( + workflowSettings.getExecutorConfiguration().getJobLockTimeInMillis()); + processEngineConfiguration.setAsyncExecutorMaxAsyncJobsDuePerAcquisition( + workflowSettings.getExecutorConfiguration().getTasksDuePerAcquisition()); + processEngineConfiguration.setAsyncExecutorDefaultAsyncJobAcquireWaitTime( + workflowSettings.getExecutorConfiguration().getAsyncJobAcquisitionInterval()); + processEngineConfiguration.setAsyncExecutorDefaultTimerJobAcquireWaitTime( + workflowSettings.getExecutorConfiguration().getTimerJobAcquisitionInterval()); + processEngineConfiguration.setAsyncHistoryEnabled(true); + processEngineConfiguration.setEnableHistoryCleaning(!isMigrationContext); + processEngineConfiguration.setCleanInstancesEndedAfter( + Duration.ofDays( + workflowSettings.getHistoryCleanUpConfiguration().getCleanAfterNumberOfDays())); + processEngineConfiguration.setHistoryCleaningTimeCycleConfig( + workflowSettings.getHistoryCleanUpConfiguration().getTimeCycleConfig()); // Add Expression Manager processEngineConfiguration.setExpressionManager(new DefaultExpressionManager(expressionMap)); @@ -150,7 +212,21 @@ public void initializeNewProcessEngine( // Add Global Failure Listener processEngineConfiguration.setEventListeners(List.of(new WorkflowFailureListener())); - this.processEngine = processEngineConfiguration.buildProcessEngine(); + try { + this.processEngine = processEngineConfiguration.buildProcessEngine(); + } catch (FlowableWrongDbException e) { + String hint = + isMigrationContext + ? String.format( + "Flowable schema version mismatch during migration: DB has '%s', library expects '%s'. " + + "Re-running migrate should auto-heal any partial upgrade.", + e.getDbVersion(), e.getLibraryVersion()) + : String.format( + "Flowable schema not initialized or at unexpected version (DB: '%s', expected: '%s'). " + + "Run `openmetadata-ops.sh migrate` before starting the server.", + e.getDbVersion(), e.getLibraryVersion()); + throw new IllegalStateException(hint, e); + } // Add SqlMapper processEngine diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/util/OpenMetadataOperations.java b/openmetadata-service/src/main/java/org/openmetadata/service/util/OpenMetadataOperations.java index 16e750427194..325053ee7ccd 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/util/OpenMetadataOperations.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/util/OpenMetadataOperations.java @@ -954,6 +954,7 @@ public Integer dropCreate() { LOG.info("Running the Native Migrations."); validateAndRunSystemDataMigrations(true); LOG.info("OpenMetadata Database Schema is Updated."); + WorkflowHandler.initialize(config, true); LOG.info("create indexes."); searchRepository.createIndexes(); searchRepository.createOrUpdateIndexTemplates(); @@ -1035,6 +1036,8 @@ public Integer migrate( LOG.info("Migrating the OpenMetadata Schema."); parseConfig(); validateAndRunSystemDataMigrations(force); + LOG.info("Running Flowable schema upgrade."); + WorkflowHandler.initialize(config, true); LOG.info("Update Search Indexes."); searchRepository.updateIndexes(); LOG.info("Update Index Templates."); diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/governance/workflows/IdempotentDdlDataSourceTest.java b/openmetadata-service/src/test/java/org/openmetadata/service/governance/workflows/IdempotentDdlDataSourceTest.java new file mode 100644 index 000000000000..cca5a6b0f57a --- /dev/null +++ b/openmetadata-service/src/test/java/org/openmetadata/service/governance/workflows/IdempotentDdlDataSourceTest.java @@ -0,0 +1,106 @@ +/* + * Copyright 2021 Collate + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.openmetadata.service.governance.workflows; + +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertSame; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import javax.sql.DataSource; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +class IdempotentDdlDataSourceTest { + + private DataSource delegate; + private Connection realConnection; + private IdempotentDdlDataSource dataSource; + + @BeforeEach + void setUp() throws Exception { + delegate = mock(DataSource.class); + realConnection = mock(Connection.class); + when(delegate.getConnection()).thenReturn(realConnection); + dataSource = new IdempotentDdlDataSource(delegate); + } + + @Test + void createStatementNoArgIsWrapped() throws Exception { + when(realConnection.createStatement()).thenReturn(mock(Statement.class)); + + Statement stmt = dataSource.getConnection().createStatement(); + + assertInstanceOf(IdempotentDdlStatement.class, stmt); + } + + @Test + void createStatementTwoArgIsWrapped() throws Exception { + when(realConnection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)) + .thenReturn(mock(Statement.class)); + + Statement stmt = + dataSource + .getConnection() + .createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); + + assertInstanceOf(IdempotentDdlStatement.class, stmt); + } + + @Test + void createStatementThreeArgIsWrapped() throws Exception { + when(realConnection.createStatement( + ResultSet.TYPE_FORWARD_ONLY, + ResultSet.CONCUR_READ_ONLY, + ResultSet.HOLD_CURSORS_OVER_COMMIT)) + .thenReturn(mock(Statement.class)); + + Statement stmt = + dataSource + .getConnection() + .createStatement( + ResultSet.TYPE_FORWARD_ONLY, + ResultSet.CONCUR_READ_ONLY, + ResultSet.HOLD_CURSORS_OVER_COMMIT); + + assertInstanceOf(IdempotentDdlStatement.class, stmt); + } + + @Test + void sqlExceptionFromConnectionSurfacesDirectly() throws Exception { + SQLException expected = new SQLException("connection refused"); + when(realConnection.createStatement()).thenThrow(expected); + + Connection proxied = dataSource.getConnection(); + + SQLException actual = assertThrows(SQLException.class, proxied::createStatement); + assertSame(expected, actual); + } + + @Test + void sqlExceptionFromNonCreateStatementMethodSurfacesDirectly() throws Exception { + SQLException expected = new SQLException("autocommit failed"); + when(realConnection.getAutoCommit()).thenThrow(expected); + + Connection proxied = dataSource.getConnection(); + + SQLException actual = assertThrows(SQLException.class, proxied::getAutoCommit); + assertSame(expected, actual); + } +} diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/governance/workflows/IdempotentDdlStatementTest.java b/openmetadata-service/src/test/java/org/openmetadata/service/governance/workflows/IdempotentDdlStatementTest.java new file mode 100644 index 000000000000..0a30803638b5 --- /dev/null +++ b/openmetadata-service/src/test/java/org/openmetadata/service/governance/workflows/IdempotentDdlStatementTest.java @@ -0,0 +1,245 @@ +/* + * Copyright 2021 Collate + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.openmetadata.service.governance.workflows; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.ArgumentMatchers.isNull; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.ResultSet; +import java.sql.Statement; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +class IdempotentDdlStatementTest { + + private Statement delegate; + private Connection connection; + private DatabaseMetaData meta; + private IdempotentDdlStatement stmt; + + @BeforeEach + void setUp() throws Exception { + delegate = mock(Statement.class); + connection = mock(Connection.class); + meta = mock(DatabaseMetaData.class); + when(connection.getMetaData()).thenReturn(meta); + when(connection.getCatalog()).thenReturn("openmetadata"); + // Default: DB stores lowercase (PostgreSQL behaviour) + when(meta.storesLowerCaseIdentifiers()).thenReturn(true); + when(meta.storesUpperCaseIdentifiers()).thenReturn(false); + stmt = new IdempotentDdlStatement(delegate, connection); + } + + // --- CREATE INDEX --- + + @Test + void skipsCreateIndexWhenIndexExists() throws Exception { + ResultSet rs = mockResultSetWithIndexName("ACT_IDX_BYTEAR_DEPL"); + when(meta.getIndexInfo( + eq("openmetadata"), isNull(), eq("act_ge_bytearray"), anyBoolean(), anyBoolean())) + .thenReturn(rs); + + boolean result = + stmt.execute("CREATE INDEX ACT_IDX_BYTEAR_DEPL ON ACT_GE_BYTEARRAY (DEPLOYMENT_ID_)"); + + assertFalse(result); + verify(delegate, never()).execute(anyString()); + } + + @Test + void executesCreateIndexWhenIndexAbsent() throws Exception { + ResultSet rs = emptyResultSet(); + when(meta.getIndexInfo( + eq("openmetadata"), isNull(), eq("act_ge_bytearray"), anyBoolean(), anyBoolean())) + .thenReturn(rs); + when(delegate.execute(anyString())).thenReturn(false); + + stmt.execute("CREATE INDEX ACT_IDX_BYTEAR_DEPL ON ACT_GE_BYTEARRAY (DEPLOYMENT_ID_)"); + + verify(delegate) + .execute("CREATE INDEX ACT_IDX_BYTEAR_DEPL ON ACT_GE_BYTEARRAY (DEPLOYMENT_ID_)"); + } + + @Test + void skipsCreateUniqueIndex() throws Exception { + ResultSet rs = mockResultSetWithIndexName("ACT_UNIQ_MEMB"); + when(meta.getIndexInfo( + eq("openmetadata"), isNull(), eq("act_id_membership"), anyBoolean(), anyBoolean())) + .thenReturn(rs); + + boolean result = + stmt.execute( + "CREATE UNIQUE INDEX ACT_UNIQ_MEMB ON ACT_ID_MEMBERSHIP (USER_ID_, GROUP_ID_)"); + + assertFalse(result); + verify(delegate, never()).execute(anyString()); + } + + // --- CREATE TABLE --- + + @Test + void skipsCreateTableWhenTableExists() throws Exception { + ResultSet rs = singleRowResultSet(); + when(meta.getTables(eq("openmetadata"), isNull(), eq("act_ru_actinst"), isNull())) + .thenReturn(rs); + + boolean result = stmt.execute("CREATE TABLE ACT_RU_ACTINST (ID_ varchar(64) NOT NULL)"); + + assertFalse(result); + verify(delegate, never()).execute(anyString()); + } + + @Test + void executesCreateTableWhenTableAbsent() throws Exception { + ResultSet rs = emptyResultSet(); + when(meta.getTables(eq("openmetadata"), isNull(), eq("act_ru_actinst"), isNull())) + .thenReturn(rs); + when(delegate.execute(anyString())).thenReturn(false); + + stmt.execute("CREATE TABLE ACT_RU_ACTINST (ID_ varchar(64) NOT NULL)"); + + verify(delegate).execute("CREATE TABLE ACT_RU_ACTINST (ID_ varchar(64) NOT NULL)"); + } + + // --- ALTER TABLE ADD COLUMN --- + + @Test + void skipsAlterTableAddColumnWhenColumnExists() throws Exception { + ResultSet rs = mockResultSetWithColumnName("completed_by_"); + when(meta.getColumns(eq("openmetadata"), isNull(), eq("act_ru_actinst"), isNull())) + .thenReturn(rs); + + boolean result = + stmt.execute("ALTER TABLE ACT_RU_ACTINST ADD COLUMN COMPLETED_BY_ varchar(255)"); + + assertFalse(result); + verify(delegate, never()).execute(anyString()); + } + + @Test + void executesAlterTableAddColumnWhenColumnAbsent() throws Exception { + ResultSet rs = emptyResultSet(); + when(meta.getColumns(eq("openmetadata"), isNull(), eq("act_ru_actinst"), isNull())) + .thenReturn(rs); + when(delegate.execute(anyString())).thenReturn(false); + + stmt.execute("ALTER TABLE ACT_RU_ACTINST ADD COLUMN COMPLETED_BY_ varchar(255)"); + + verify(delegate).execute("ALTER TABLE ACT_RU_ACTINST ADD COLUMN COMPLETED_BY_ varchar(255)"); + } + + @Test + void doesNotMatchAlterTableAddConstraint() throws Exception { + when(delegate.execute(anyString())).thenReturn(false); + + stmt.execute("ALTER TABLE ACT_RU_TASK ADD CONSTRAINT PK_RU PRIMARY KEY (ID_)"); + + verify(delegate).execute("ALTER TABLE ACT_RU_TASK ADD CONSTRAINT PK_RU PRIMARY KEY (ID_)"); + verify(meta, never()).getColumns(anyString(), anyString(), anyString(), anyString()); + } + + @Test + void doesNotMatchAlterTableAddPrimaryKey() throws Exception { + when(delegate.execute(anyString())).thenReturn(false); + + stmt.execute("ALTER TABLE ACT_RU_TASK ADD PRIMARY KEY (ID_)"); + + verify(delegate).execute("ALTER TABLE ACT_RU_TASK ADD PRIMARY KEY (ID_)"); + verify(meta, never()).getColumns(anyString(), anyString(), anyString(), anyString()); + } + + // --- executeUpdate overloads --- + + @Test + void skipsExecuteUpdateForExistingIndex() throws Exception { + ResultSet rs = mockResultSetWithIndexName("ACT_IDX_BYTEAR_DEPL"); + when(meta.getIndexInfo( + eq("openmetadata"), isNull(), eq("act_ge_bytearray"), anyBoolean(), anyBoolean())) + .thenReturn(rs); + + int result = + stmt.executeUpdate("CREATE INDEX ACT_IDX_BYTEAR_DEPL ON ACT_GE_BYTEARRAY (DEPLOYMENT_ID_)"); + + assertEquals(0, result); + verify(delegate, never()).executeUpdate(anyString()); + } + + // --- Pass-through --- + + @Test + void passesThroughNonDdlStatements() throws Exception { + String sql = "SELECT * FROM ACT_GE_PROPERTY WHERE NAME_ = 'common.schema.version'"; + when(delegate.execute(sql)).thenReturn(true); + + stmt.execute(sql); + + verify(delegate).execute(sql); + } + + @Test + void skipsCreateIndexOnMysqlWithUpperCaseStoredIdentifiers() throws Exception { + // MySQL lower_case_table_names=0: identifiers stored as-is (uppercase) + when(meta.storesLowerCaseIdentifiers()).thenReturn(false); + when(meta.storesUpperCaseIdentifiers()).thenReturn(true); + + ResultSet rs = mockResultSetWithIndexName("ACT_IDX_BYTEAR_DEPL"); + when(meta.getIndexInfo( + eq("openmetadata"), isNull(), eq("ACT_GE_BYTEARRAY"), anyBoolean(), anyBoolean())) + .thenReturn(rs); + + boolean result = + stmt.execute("CREATE INDEX ACT_IDX_BYTEAR_DEPL ON ACT_GE_BYTEARRAY (DEPLOYMENT_ID_)"); + + assertFalse(result); + verify(delegate, never()).execute(anyString()); + } + + // --- Helpers --- + + private ResultSet mockResultSetWithIndexName(String name) throws Exception { + ResultSet rs = mock(ResultSet.class); + when(rs.next()).thenReturn(true, false); + when(rs.getString("INDEX_NAME")).thenReturn(name); + return rs; + } + + private ResultSet mockResultSetWithColumnName(String name) throws Exception { + ResultSet rs = mock(ResultSet.class); + when(rs.next()).thenReturn(true, false); + when(rs.getString("COLUMN_NAME")).thenReturn(name); + return rs; + } + + private ResultSet singleRowResultSet() throws Exception { + ResultSet rs = mock(ResultSet.class); + when(rs.next()).thenReturn(true); + return rs; + } + + private ResultSet emptyResultSet() throws Exception { + ResultSet rs = mock(ResultSet.class); + when(rs.next()).thenReturn(false); + return rs; + } +} diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/governance/workflows/WorkflowHandlerSchemaUpdateTest.java b/openmetadata-service/src/test/java/org/openmetadata/service/governance/workflows/WorkflowHandlerSchemaUpdateTest.java new file mode 100644 index 000000000000..107de45e2ee1 --- /dev/null +++ b/openmetadata-service/src/test/java/org/openmetadata/service/governance/workflows/WorkflowHandlerSchemaUpdateTest.java @@ -0,0 +1,169 @@ +/* + * Copyright 2024 Collate + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.openmetadata.service.governance.workflows; + +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.RETURNS_DEEP_STUBS; +import static org.mockito.Mockito.lenient; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.mockConstruction; +import static org.mockito.Mockito.mockStatic; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.lang.reflect.Field; +import org.flowable.common.engine.api.FlowableWrongDbException; +import org.flowable.engine.ProcessEngine; +import org.flowable.engine.ProcessEngineConfiguration; +import org.flowable.engine.ProcessEngines; +import org.flowable.engine.impl.cfg.StandaloneProcessEngineConfiguration; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.MockedConstruction; +import org.mockito.MockedStatic; +import org.mockito.junit.jupiter.MockitoExtension; +import org.mockito.junit.jupiter.MockitoSettings; +import org.mockito.quality.Strictness; +import org.openmetadata.schema.configuration.WorkflowSettings; +import org.openmetadata.service.Entity; +import org.openmetadata.service.OpenMetadataApplicationConfig; +import org.openmetadata.service.clients.pipeline.PipelineServiceClientFactory; +import org.openmetadata.service.jdbi3.HikariCPDataSourceFactory; +import org.openmetadata.service.jdbi3.SystemRepository; + +@ExtendWith(MockitoExtension.class) +@MockitoSettings(strictness = Strictness.LENIENT) +class WorkflowHandlerSchemaUpdateTest { + + @BeforeEach + @AfterEach + void resetWorkflowHandlerState() throws ReflectiveOperationException { + setStaticField("initialized", false); + setStaticField("instance", null); + } + + @Test + void runtimeModeWrapsFlowableWrongDbExceptionWithActionableMessage() { + try (MockedConstruction engineMock = + mockConstruction( + StandaloneProcessEngineConfiguration.class, + (mock, ctx) -> + when(mock.buildProcessEngine()) + .thenThrow(new FlowableWrongDbException("7.2.0.2", "7.1.0.0"))); + MockedStatic ignored = mockStatic(ProcessEngines.class); + MockedStatic entityMock = mockStatic(Entity.class); + MockedStatic pscMock = + mockStatic(PipelineServiceClientFactory.class)) { + + setupEntityMock(entityMock); + pscMock + .when(() -> PipelineServiceClientFactory.createPipelineServiceClient(any())) + .thenReturn(null); + + IllegalStateException ex = + assertThrows( + IllegalStateException.class, + () -> WorkflowHandler.initialize(buildMockConfig(), false)); + + assertTrue(ex.getMessage().contains("openmetadata-ops.sh migrate")); + assertInstanceOf(FlowableWrongDbException.class, ex.getCause()); + } + } + + @Test + void migrationModeSetsDbSchemaUpdateTrue() { + ProcessEngine mockEngine = mock(ProcessEngine.class, RETURNS_DEEP_STUBS); + + try (MockedConstruction engineMock = + mockConstruction( + StandaloneProcessEngineConfiguration.class, + (mock, ctx) -> when(mock.buildProcessEngine()).thenReturn(mockEngine)); + MockedStatic ignored = mockStatic(ProcessEngines.class); + MockedStatic entityMock = mockStatic(Entity.class); + MockedStatic pscMock = + mockStatic(PipelineServiceClientFactory.class)) { + + setupEntityMock(entityMock); + pscMock + .when(() -> PipelineServiceClientFactory.createPipelineServiceClient(any())) + .thenReturn(null); + + WorkflowHandler.initialize(buildMockConfig(), true); + + StandaloneProcessEngineConfiguration engineConfig = engineMock.constructed().getLast(); + verify(engineConfig) + .setDatabaseSchemaUpdate(ProcessEngineConfiguration.DB_SCHEMA_UPDATE_TRUE); + } + } + + @Test + void runtimeModeSetsDbSchemaUpdateFalse() { + try (MockedConstruction engineMock = + mockConstruction( + StandaloneProcessEngineConfiguration.class, + (mock, ctx) -> + when(mock.buildProcessEngine()) + .thenThrow(new FlowableWrongDbException("7.2.0.2", "7.1.0.0"))); + MockedStatic ignored = mockStatic(ProcessEngines.class); + MockedStatic entityMock = mockStatic(Entity.class); + MockedStatic pscMock = + mockStatic(PipelineServiceClientFactory.class)) { + + setupEntityMock(entityMock); + pscMock + .when(() -> PipelineServiceClientFactory.createPipelineServiceClient(any())) + .thenReturn(null); + + assertThrows( + IllegalStateException.class, () -> WorkflowHandler.initialize(buildMockConfig(), false)); + + StandaloneProcessEngineConfiguration engineConfig = engineMock.constructed().getLast(); + verify(engineConfig) + .setDatabaseSchemaUpdate(ProcessEngineConfiguration.DB_SCHEMA_UPDATE_FALSE); + } + } + + // ── Helpers ────────────────────────────────────────────────────────────────── + + private void setupEntityMock(MockedStatic entityMock) { + SystemRepository systemRepository = mock(SystemRepository.class); + WorkflowSettings workflowSettings = mock(WorkflowSettings.class, RETURNS_DEEP_STUBS); + entityMock.when(Entity::getSystemRepository).thenReturn(systemRepository); + lenient().when(systemRepository.getWorkflowSettingsOrDefault()).thenReturn(workflowSettings); + } + + private OpenMetadataApplicationConfig buildMockConfig() { + OpenMetadataApplicationConfig config = mock(OpenMetadataApplicationConfig.class); + HikariCPDataSourceFactory dsf = mock(HikariCPDataSourceFactory.class); + lenient().when(config.getDataSourceFactory()).thenReturn(dsf); + lenient().when(dsf.getUrl()).thenReturn("jdbc:postgresql://localhost:5432/openmetadata_db"); + lenient().when(dsf.getUser()).thenReturn("openmetadata_user"); + lenient().when(dsf.getPassword()).thenReturn("openmetadata_password"); + lenient().when(dsf.getDriverClass()).thenReturn("org.postgresql.Driver"); + lenient().when(config.getPipelineServiceClientConfiguration()).thenReturn(null); + return config; + } + + private static void setStaticField(String fieldName, Object value) + throws ReflectiveOperationException { + Field field = WorkflowHandler.class.getDeclaredField(fieldName); + field.setAccessible(true); + field.set(null, value); + } +}