diff --git a/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunSync.java b/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunSync.java index facbcf3a6..4aeaa6ff8 100644 --- a/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunSync.java +++ b/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunSync.java @@ -79,6 +79,7 @@ public class RunSync { private static final String CONTINUOUS_MODE = "m"; private static final String CONTINUOUS_MODE_INTERVAL = "t"; private static final String HELP_OPTION = "h"; + private static final String SYNC_TIMEOUT_OPTION = "timeout"; private static final Options OPTIONS = new Options() @@ -115,6 +116,11 @@ public class RunSync { "continuousModeInterval", true, "The interval in seconds to schedule the loop. Requires --continuousMode to be set. Defaults to 5 seconds.") + .addOption( + SYNC_TIMEOUT_OPTION, + "syncTimeout", + true, + "The maximum time in seconds allowed for a single table sync before timing out. Defaults to no timeout.") .addOption(HELP_OPTION, "help", false, "Displays help information to run this utility"); static SourceTable sourceTableBuilder( @@ -155,38 +161,70 @@ static List targetTableBuilder( return targetTables; } - static void syncTableMetdata( + static void syncTableMetadata( DatasetConfig datasetConfig, List tableFormatList, CatalogConfig catalogConfig, Configuration hadoopConf, - ConversionSourceProvider conversionSourceProvider) { + ConversionSourceProvider conversionSourceProvider, + long timeoutInSeconds) { ConversionController conversionController = new ConversionController(hadoopConf); - for (DatasetConfig.Table table : datasetConfig.getDatasets()) { - log.info( - "Running sync for basePath {} for following table formats {}", - table.getTableBasePath(), - tableFormatList); - Properties sourceProperties = new Properties(); - if (table.getPartitionSpec() != null) { - sourceProperties.put( - HudiSourceConfig.PARTITION_FIELD_SPEC_CONFIG, table.getPartitionSpec()); - } + // use a single-thread executor since tasks are processed sequentially + java.util.concurrent.ExecutorService syncExecutor = Executors.newSingleThreadExecutor(); + try { - SourceTable sourceTable = - sourceTableBuilder(table, catalogConfig, datasetConfig, sourceProperties); - List targetTables = targetTableBuilder(table, catalogConfig, tableFormatList); - ConversionConfig conversionConfig = - ConversionConfig.builder() - .sourceTable(sourceTable) - .targetTables(targetTables) - .syncMode(SyncMode.INCREMENTAL) - .build(); - try { - conversionController.sync(conversionConfig, conversionSourceProvider); - } catch (Exception e) { - log.error("Error running sync for {}", table.getTableBasePath(), e); + for (DatasetConfig.Table table : datasetConfig.getDatasets()) { + log.info( + "Running sync for basePath {} for following table formats {}", + table.getTableBasePath(), + tableFormatList); + Properties sourceProperties = new Properties(); + if (table.getPartitionSpec() != null) { + sourceProperties.put( + HudiSourceConfig.PARTITION_FIELD_SPEC_CONFIG, table.getPartitionSpec()); + } + + SourceTable sourceTable = + sourceTableBuilder(table, catalogConfig, datasetConfig, sourceProperties); + List targetTables = targetTableBuilder(table, catalogConfig, tableFormatList); + ConversionConfig conversionConfig = + ConversionConfig.builder() + .sourceTable(sourceTable) + .targetTables(targetTables) + .syncMode(SyncMode.INCREMENTAL) + .build(); + if (timeoutInSeconds > 0) { + java.util.concurrent.Future standardFuture = + syncExecutor.submit( + () -> { + conversionController.sync(conversionConfig, conversionSourceProvider); + }); + try { + standardFuture.get(timeoutInSeconds, TimeUnit.SECONDS); + } catch (java.util.concurrent.TimeoutException e) { + log.error( + "Sync timed out for {} after {} seconds. Triggering thread interrupt.", + table.getTableBasePath(), + timeoutInSeconds); + standardFuture.cancel(true); + } catch (java.util.concurrent.ExecutionException e) { + log.error("Error running sync for {}", table.getTableBasePath(), e.getCause()); + } catch (InterruptedException e) { + log.error("Sync main runner thread was interrupted", e); + standardFuture.cancel(true); + Thread.currentThread().interrupt(); + } + } else { + // fallback to original synchronous behavior if no timeout option is provided + try { + conversionController.sync(conversionConfig, conversionSourceProvider); + } catch (Exception e) { + log.error("Error running sync for {}", table.getTableBasePath(), e); + } + } } + } finally { + syncExecutor.shutdownNow(); } } @@ -288,14 +326,29 @@ private static void runSync(CommandLine cmd) throws IOException { String icebergCatalogConfigpath = getValueFromConfig(cmd, ICEBERG_CATALOG_CONFIG_PATH); String hadoopConfigpath = getValueFromConfig(cmd, HADOOP_CONFIG_PATH); String conversionProviderConfigpath = getValueFromConfig(cmd, CONVERTERS_CONFIG_PATH); + String rawTimeout = getValueFromConfig(cmd, SYNC_TIMEOUT_OPTION); + if (rawTimeout != null && !rawTimeout.isEmpty()) { + com.google.common.base.Preconditions.checkArgument( + rawTimeout.matches("^\\d+$"), + "Invalid value for --%s: '%s' must be a valid non-negative integer.", + SYNC_TIMEOUT_OPTION, + rawTimeout); + } + long timeoutInSeconds = + rawTimeout == null || rawTimeout.isEmpty() ? 0L : Long.parseLong(rawTimeout); DatasetConfig datasetConfig = getDatasetConfig(datasetConfigpath); CatalogConfig catalogConfig = getIcebergCatalogConfig(icebergCatalogConfigpath); Configuration hadoopConf = gethadoopConf(hadoopConfigpath); ConversionSourceProvider conversionSourceProvider = getConversionSourceProvider(conversionProviderConfigpath, datasetConfig, hadoopConf); List tableFormatList = datasetConfig.getTargetFormats(); - syncTableMetdata( - datasetConfig, tableFormatList, catalogConfig, hadoopConf, conversionSourceProvider); + syncTableMetadata( + datasetConfig, + tableFormatList, + catalogConfig, + hadoopConf, + conversionSourceProvider, + timeoutInSeconds); } static byte[] getCustomConfigurations(String Configpath) throws IOException { diff --git a/xtable-utilities/src/test/java/org/apache/xtable/utilities/ITRunSync.java b/xtable-utilities/src/test/java/org/apache/xtable/utilities/ITRunSync.java index f18ce867f..59d3768b6 100644 --- a/xtable-utilities/src/test/java/org/apache/xtable/utilities/ITRunSync.java +++ b/xtable-utilities/src/test/java/org/apache/xtable/utilities/ITRunSync.java @@ -93,6 +93,53 @@ void testContinuousSyncMode(@TempDir Path tempDir) throws IOException { } } + @Test + void testSyncCompletesBeforeTimeout(@TempDir Path tempDir) throws IOException { + String tableName = "tiny-table"; + try (GenericTable table = + TestJavaHudiTable.forStandardSchema( + tableName, tempDir, null, HoodieTableType.COPY_ON_WRITE)) { + table.insertRows(5); // small numRows for it to finish before the timeout does + File configFile = writeConfigFile(tempDir, table, tableName); + + String[] args = new String[] {"--datasetConfig", configFile.getPath(), "--syncTimeout", "60"}; + RunSync.main(args); + // check successful sync + Path icebergMetadataPath = Paths.get(URI.create(table.getBasePath() + "/metadata")); + waitForNumIcebergCommits(icebergMetadataPath, 3); + } + } + + @Test + void testSyncTimeoutFires(@TempDir Path tempDir) throws IOException { + String tableName = "large-table"; + try (GenericTable table = + TestJavaHudiTable.forStandardSchema( + tableName, tempDir, null, HoodieTableType.COPY_ON_WRITE)) { + + table.insertRows(5000); + File configFile = writeConfigFile(tempDir, table, tableName); + + long startTime = System.currentTimeMillis(); + + String[] args = + new String[] { + "--datasetConfig", + configFile.getPath(), + "--syncTimeout", + "1" // rapid timeout compared to the large table + }; + RunSync.main(args); + + long durationInSeconds = (System.currentTimeMillis() - startTime) / 1000; + + org.junit.jupiter.api.Assertions.assertTrue( + durationInSeconds < 3, // being generous with 3 secs + "The sync should have cut off and stopped immediately around 1 second. Took: " + + durationInSeconds); + } + } + private static File writeConfigFile(Path tempDir, GenericTable table, String tableName) throws IOException { RunSync.DatasetConfig config =