diff --git a/.github/workflows/update_build_status.yml b/.github/workflows/update_build_status.yml index d6860fb3dc47..2194cb552821 100644 --- a/.github/workflows/update_build_status.yml +++ b/.github/workflows/update_build_status.yml @@ -36,77 +36,175 @@ jobs: with: github-token: ${{ secrets.GITHUB_TOKEN }} script: | - const endpoint = 'GET /repos/:owner/:repo/pulls?state=:state' - const params = { - owner: context.repo.owner, - repo: context.repo.repo, - state: 'open' + const SEARCH_WINDOW_DAYS = 7; + const BATCH_SIZE = 10; + const maybeReady = ['behind', 'clean', 'draft', 'has_hooks', 'unknown', 'unstable']; + + function getSearchSince() { + const windowStart = new Date(Date.now() - SEARCH_WINDOW_DAYS * 24 * 60 * 60 * 1000); + return windowStart.toISOString().replace(/\.\d{3}Z$/, 'Z'); } - // See https://docs.github.com/en/graphql/reference/enums#mergestatestatus - const maybeReady = ['behind', 'clean', 'draft', 'has_hooks', 'unknown', 'unstable']; + async function searchActivePullRequests() { + const since = getSearchSince(); + const query = + `repo:${context.repo.owner}/${context.repo.repo} is:pr is:open updated:>=${since}`; + const items = []; + let page = 1; + + console.log(`Searching active PRs with query: ${query}`); + + while (true) { + const response = await github.request('GET /search/issues', { + q: query, + page, + per_page: 100, + sort: 'updated', + order: 'desc' + }); + const pageItems = response.data.items || []; + items.push(...pageItems); + console.log(`Fetched ${pageItems.length} PRs from search page ${page}`); + + if (pageItems.length < 100) { + break; + } + page += 1; + } + + return items; + } + + function shouldSkipPatch(checkRun, workflowRun) { + const sameStatus = checkRun.status === workflowRun.status; + const sameConclusion = + workflowRun.status !== 'completed' || checkRun.conclusion === workflowRun.conclusion; + const sameDetailsUrl = checkRun.details_url === workflowRun.details_url; - // Iterate open PRs - for await (const prs of github.paginate.iterator(endpoint,params)) { - // Each page - for await (const pr of prs.data) { - console.log('SHA: ' + pr.head.sha) - console.log(' Mergeable status: ' + pr.mergeable_state) - if (pr.mergeable_state == null || maybeReady.includes(pr.mergeable_state)) { - const checkRuns = await github.request('GET /repos/{owner}/{repo}/commits/{ref}/check-runs', { + return sameStatus && sameConclusion && sameDetailsUrl; + } + + async function processPullRequest(searchItem) { + const prNumber = searchItem.number; + + try { + const pr = ( + await github.request('GET /repos/{owner}/{repo}/pulls/{pull_number}', { owner: context.repo.owner, repo: context.repo.repo, - ref: pr.head.sha + pull_number: prNumber }) + ).data; + + console.log(`PR #${pr.number} SHA: ${pr.head.sha}`); + console.log(` Mergeable status: ${pr.mergeable_state}`); + if (pr.mergeable_state != null && !maybeReady.includes(pr.mergeable_state)) { + console.log(` Skip PR #${pr.number}: mergeable state ${pr.mergeable_state}`); + return; + } - // Iterator GitHub Checks in the PR - for await (const cr of checkRuns.data.check_runs) { - if (cr.name == 'Build' && cr.conclusion != "action_required") { - // text contains parameters to make request in JSON. - const params = JSON.parse(cr.output.text) - - // Get the workflow run in the forked repository - let run - try { - run = await github.request('GET /repos/{owner}/{repo}/actions/runs/{run_id}', params) - } catch (error) { - console.error(error) - // Run not found. This can happen when the PR author removes GitHub Actions runs or - // disalbes GitHub Actions. - continue - } - - // Keep syncing the status of the checks - try { - if (run.data.status == 'completed') { - console.log(' Run ' + cr.id + ': set status (' + run.data.status + ') and conclusion (' + run.data.conclusion + ')') - const response = await github.request('PATCH /repos/{owner}/{repo}/check-runs/{check_run_id}', { - owner: context.repo.owner, - repo: context.repo.repo, - check_run_id: cr.id, - output: cr.output, - status: run.data.status, - conclusion: run.data.conclusion, - details_url: run.data.details_url - }) - } else { - console.log(' Run ' + cr.id + ': set status (' + run.data.status + ')') - const response = await github.request('PATCH /repos/{owner}/{repo}/check-runs/{check_run_id}', { - owner: context.repo.owner, - repo: context.repo.repo, - check_run_id: cr.id, - output: cr.output, - status: run.data.status, - details_url: run.data.details_url - }) - } - } catch (error) { - console.error(error) - continue - } - break - } + const checkRuns = await github.request( + 'GET /repos/{owner}/{repo}/commits/{ref}/check-runs', + { + owner: context.repo.owner, + repo: context.repo.repo, + ref: pr.head.sha } + ); + + const buildCheckRun = checkRuns.data.check_runs.find( + (checkRun) => + checkRun.name === 'Build' && checkRun.conclusion !== 'action_required' + ); + + if (!buildCheckRun) { + console.log(` Skip PR #${pr.number}: no eligible Build check run`); + return; } + + if (!buildCheckRun.output || !buildCheckRun.output.text) { + console.log(` Skip PR #${pr.number}: Build check run ${buildCheckRun.id} has no run metadata`); + return; + } + + let workflowRunParams; + try { + workflowRunParams = JSON.parse(buildCheckRun.output.text); + } catch (error) { + console.error(` Skip PR #${pr.number}: invalid JSON in check run ${buildCheckRun.id}`, error); + return; + } + + let workflowRun; + try { + workflowRun = ( + await github.request( + 'GET /repos/{owner}/{repo}/actions/runs/{run_id}', + workflowRunParams + ) + ).data; + } catch (error) { + console.error(` Skip PR #${pr.number}: workflow run lookup failed`, error); + return; + } + + if (shouldSkipPatch(buildCheckRun, workflowRun)) { + console.log( + ` Skip PR #${pr.number}: Build check run ${buildCheckRun.id} already matches ${workflowRun.status}/${workflowRun.conclusion}` + ); + return; + } + + const patchParams = { + owner: context.repo.owner, + repo: context.repo.repo, + check_run_id: buildCheckRun.id, + output: buildCheckRun.output, + status: workflowRun.status, + details_url: workflowRun.details_url + }; + + if (workflowRun.status === 'completed') { + patchParams.conclusion = workflowRun.conclusion; + console.log( + ` Patch PR #${pr.number} check run ${buildCheckRun.id}: ${buildCheckRun.status}/${buildCheckRun.conclusion} -> ${workflowRun.status}/${workflowRun.conclusion}` + ); + } else { + console.log( + ` Patch PR #${pr.number} check run ${buildCheckRun.id}: ${buildCheckRun.status} -> ${workflowRun.status}` + ); + } + + await github.request( + 'PATCH /repos/{owner}/{repo}/check-runs/{check_run_id}', + patchParams + ); + } catch (error) { + console.error(`Failed to sync PR #${prNumber}`, error); + } + } + + async function processInBatches(items) { + for (let index = 0; index < items.length; index += BATCH_SIZE) { + const batch = items.slice(index, index + BATCH_SIZE); + console.log( + `Processing batch ${Math.floor(index / BATCH_SIZE) + 1} with ${batch.length} PRs` + ); + await Promise.all(batch.map((item) => processPullRequest(item))); + } + } + + try { + const activePullRequests = await searchActivePullRequests(); + console.log(`Found ${activePullRequests.length} active PRs to evaluate`); + + if (activePullRequests.length === 0) { + console.log('No active PRs matched the search window'); + return; } - } \ No newline at end of file + + await processInBatches(activePullRequests); + } catch (error) { + console.error('Update build status workflow failed', error); + throw error; + } diff --git a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/kingbase/container/AbstractKingbaseContainerTest.java b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/kingbase/container/AbstractKingbaseContainerTest.java index c4be8aa731c4..b9a96f97b885 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/kingbase/container/AbstractKingbaseContainerTest.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/kingbase/container/AbstractKingbaseContainerTest.java @@ -23,6 +23,7 @@ import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.TestInstance; import org.junit.jupiter.api.condition.DisabledOnOs; import org.junit.jupiter.api.condition.OS; import org.testcontainers.containers.GenericContainer; @@ -44,6 +45,7 @@ * errors, please replace the image with a newly built one that contains a valid license. */ @DisabledOnOs(OS.WINDOWS) +@TestInstance(TestInstance.Lifecycle.PER_CLASS) public abstract class AbstractKingbaseContainerTest { protected static final String KINGBASE_IMAGE = "liangyaobo/kingbase:v8r6-license"; @@ -53,12 +55,12 @@ public abstract class AbstractKingbaseContainerTest { protected static final String SCHEMA = "public"; protected static final int KINGBASE_PORT = 54321; - protected static GenericContainer kingbaseContainer; - protected static Connection connection; - protected static KingbaseCatalog catalog; + protected GenericContainer kingbaseContainer; + protected String jdbcUrl; + protected KingbaseCatalog catalog; @BeforeAll - public static void startContainer() throws SQLException { + public void startContainer() throws SQLException { DockerImageName imageName = DockerImageName.parse(KINGBASE_IMAGE); kingbaseContainer = @@ -73,9 +75,9 @@ public static void startContainer() throws SQLException { String host = kingbaseContainer.getHost(); Integer mappedPort = kingbaseContainer.getMappedPort(KINGBASE_PORT); - String jdbcUrl = String.format("jdbc:kingbase8://%s:%d/%s", host, mappedPort, DATABASE); + jdbcUrl = String.format("jdbc:kingbase8://%s:%d/%s", host, mappedPort, DATABASE); - connection = connectWithRetry(jdbcUrl, USERNAME, PASSWORD); + waitUntilSqlReady(); catalog = new KingbaseCatalog( @@ -89,24 +91,49 @@ public static void startContainer() throws SQLException { } @AfterAll - public static void stopContainer() throws SQLException { + public void stopContainer() { if (catalog != null) { catalog.close(); } - if (connection != null && !connection.isClosed()) { - connection.close(); - } if (kingbaseContainer != null) { kingbaseContainer.stop(); } } protected void executeSql(String sql) throws SQLException { - try (Statement stmt = connection.createStatement()) { + try (Connection connection = getConnection(); + Statement stmt = connection.createStatement()) { stmt.execute(sql); } } + protected Connection getConnection() throws SQLException { + return connectWithRetry(jdbcUrl, USERNAME, PASSWORD); + } + + private void waitUntilSqlReady() throws SQLException { + RetryUtils.RetryMaterial retryMaterial = + new RetryUtils.RetryMaterial(30, true, exception -> true, 2000); + + try { + RetryUtils.retryWithException( + () -> { + try (Connection connection = + DriverManager.getConnection(jdbcUrl, USERNAME, PASSWORD); + Statement stmt = connection.createStatement()) { + stmt.execute("SELECT 1"); + } + return null; + }, + retryMaterial); + } catch (Exception e) { + if (e instanceof SQLException) { + throw (SQLException) e; + } + throw new SQLException("Kingbase is not ready to execute SQL", e); + } + } + private static Connection connectWithRetry(String jdbcUrl, String username, String password) throws SQLException { RetryUtils.RetryMaterial retryMaterial = diff --git a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/kingbase/container/KingbaseCatalogContainerTest.java b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/kingbase/container/KingbaseCatalogContainerTest.java index 936ddc42f439..7de73e62908a 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/kingbase/container/KingbaseCatalogContainerTest.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/kingbase/container/KingbaseCatalogContainerTest.java @@ -95,7 +95,7 @@ public void testTableExists() throws SQLException { } @Test - public void testCreateTableViaAPI() throws SQLException { + public void testCreateTableViaAPI() { String testTableName = "test_api_create_table"; TablePath tablePath = TablePath.of(DATABASE, SCHEMA, testTableName); diff --git a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/kingbase/container/KingbaseDialectContainerTest.java b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/kingbase/container/KingbaseDialectContainerTest.java index 29cec6376350..c195d9d3539b 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/kingbase/container/KingbaseDialectContainerTest.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/kingbase/container/KingbaseDialectContainerTest.java @@ -28,6 +28,7 @@ import org.junit.jupiter.api.condition.DisabledOnOs; import org.junit.jupiter.api.condition.OS; +import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; @@ -41,36 +42,37 @@ @DisabledOnOs(OS.WINDOWS) public class KingbaseDialectContainerTest extends AbstractKingbaseContainerTest { - private static KingbaseDialect dialect; private static final String TEST_TABLE = "dialect_test_table"; + private KingbaseDialect dialect; @BeforeAll - public static void setupDialect() throws SQLException { + public void setupDialect() throws SQLException { dialect = new KingbaseDialect(); - String createTableSql = - String.format( - "CREATE TABLE %s.%s (" - + "id INT8 PRIMARY KEY, " - + "name VARCHAR(100), " - + "value NUMERIC(10,2), " - + "created_at TIMESTAMP" - + ")", - quoteIdentifier(SCHEMA), quoteIdentifier(TEST_TABLE)); + String qualifiedTable = + String.format("%s.%s", quoteIdentifier(SCHEMA), quoteIdentifier(TEST_TABLE)); - try (Statement stmt = connection.createStatement()) { - stmt.execute(createTableSql); - } + String createTableSql = + "CREATE TABLE IF NOT EXISTS " + + qualifiedTable + + " (" + + "id BIGINT PRIMARY KEY, " + + "name VARCHAR(100), " + + "value NUMERIC(10,2), " + + "created_at TIMESTAMP" + + ")"; + executeSql(createTableSql); + + String truncateSql = "TRUNCATE TABLE " + qualifiedTable; + executeSql(truncateSql); // Insert test data String insertSql = - String.format( - "INSERT INTO %s.%s (id, name, value, created_at) " - + "VALUES (1, 'test1', 100.50, CURRENT_TIMESTAMP)", - quoteIdentifier(SCHEMA), quoteIdentifier(TEST_TABLE)); - try (Statement stmt = connection.createStatement()) { - stmt.execute(insertSql); - } + "INSERT INTO " + + qualifiedTable + + " (id, name, value, created_at) " + + "VALUES (1, 'test1', 100.50, CURRENT_TIMESTAMP)"; + executeSql(insertSql); } @Test @@ -221,7 +223,8 @@ public void testRealUpsertExecution() throws SQLException { executeSql(insertSql); // Verify insert - try (Statement stmt = connection.createStatement(); + try (Connection connection = getConnection(); + Statement stmt = connection.createStatement(); ResultSet rs = stmt.executeQuery( String.format( @@ -290,20 +293,15 @@ public void testFieldIdeHandling() { @Test public void testCreatPreparedStatement() throws SQLException { - PreparedStatement ps = null; - try { - String sql = - String.format( - "SELECT * FROM %s.%s", - quoteIdentifier(SCHEMA), quoteIdentifier(TEST_TABLE)); - ps = dialect.creatPreparedStatement(connection, sql, 100); + String sql = + String.format( + "SELECT * FROM %s.%s", + quoteIdentifier(SCHEMA), quoteIdentifier(TEST_TABLE)); + try (Connection connection = getConnection(); + PreparedStatement ps = dialect.creatPreparedStatement(connection, sql, 100)) { Assertions.assertNotNull(ps); Assertions.assertEquals(100, ps.getFetchSize()); - } finally { - if (ps != null) { - ps.close(); - } } }