From 498f536298c4a4615a6436d87ff27023047d3b05 Mon Sep 17 00:00:00 2001 From: Selim Soufargi Date: Mon, 18 May 2026 13:07:24 +0200 Subject: [PATCH 1/6] timeout feature for runSync (per table) --- .../org/apache/xtable/utilities/RunSync.java | 77 ++++++++++++++----- 1 file changed, 58 insertions(+), 19 deletions(-) diff --git a/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunSync.java b/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunSync.java index facbcf3a6..145a57bc8 100644 --- a/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunSync.java +++ b/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunSync.java @@ -26,6 +26,7 @@ import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -79,6 +80,7 @@ public class RunSync { private static final String CONTINUOUS_MODE = "m"; private static final String CONTINUOUS_MODE_INTERVAL = "t"; private static final String HELP_OPTION = "h"; + private static final String SYNC_TIMEOUT_OPTION = "timeout"; private static final Options OPTIONS = new Options() @@ -115,7 +117,13 @@ public class RunSync { "continuousModeInterval", true, "The interval in seconds to schedule the loop. Requires --continuousMode to be set. Defaults to 5 seconds.") - .addOption(HELP_OPTION, "help", false, "Displays help information to run this utility"); + .addOption(HELP_OPTION, "help", false, "Displays help information to run this utility") + .addOption( + SYNC_TIMEOUT_OPTION, + "syncTimeout", + true, + "The maximum time in seconds allowed for a single table sync before timing out. Defaults to no timeout.") + .addOption(HELP_OPTION, "help", false, "Displays help information to run this utility");; static SourceTable sourceTableBuilder( @NonNull DatasetConfig.Table table, @@ -155,37 +163,67 @@ static List targetTableBuilder( return targetTables; } - static void syncTableMetdata( + static void syncTableMetadata( DatasetConfig datasetConfig, List tableFormatList, CatalogConfig catalogConfig, Configuration hadoopConf, - ConversionSourceProvider conversionSourceProvider) { + ConversionSourceProvider conversionSourceProvider, + long timeoutInSeconds) { ConversionController conversionController = new ConversionController(hadoopConf); + java.util.concurrent.ExecutorService syncExecutor = Executors.newCachedThreadPool(); + java.util.concurrent.ScheduledExecutorService delayer = Executors.newScheduledThreadPool(1); for (DatasetConfig.Table table : datasetConfig.getDatasets()) { log.info( - "Running sync for basePath {} for following table formats {}", - table.getTableBasePath(), - tableFormatList); + "Running sync for basePath {} for following table formats {}", + table.getTableBasePath(), + tableFormatList); Properties sourceProperties = new Properties(); if (table.getPartitionSpec() != null) { sourceProperties.put( - HudiSourceConfig.PARTITION_FIELD_SPEC_CONFIG, table.getPartitionSpec()); + HudiSourceConfig.PARTITION_FIELD_SPEC_CONFIG, table.getPartitionSpec()); } SourceTable sourceTable = - sourceTableBuilder(table, catalogConfig, datasetConfig, sourceProperties); + sourceTableBuilder(table, catalogConfig, datasetConfig, sourceProperties); List targetTables = targetTableBuilder(table, catalogConfig, tableFormatList); ConversionConfig conversionConfig = - ConversionConfig.builder() - .sourceTable(sourceTable) - .targetTables(targetTables) - .syncMode(SyncMode.INCREMENTAL) - .build(); - try { - conversionController.sync(conversionConfig, conversionSourceProvider); - } catch (Exception e) { - log.error("Error running sync for {}", table.getTableBasePath(), e); + ConversionConfig.builder() + .sourceTable(sourceTable) + .targetTables(targetTables) + .syncMode(SyncMode.INCREMENTAL) + .build(); + if (timeoutInSeconds > 0) { + CompletableFuture syncFuture = CompletableFuture.runAsync(() -> { + conversionController.sync(conversionConfig, conversionSourceProvider); + }, syncExecutor); + + delayer.schedule(() -> { + if (!syncFuture.isDone()) { + // end complete the future exceptionally with a TimeoutException + syncFuture.completeExceptionally(new java.util.concurrent.TimeoutException( + "Sync timed out for " + table.getTableBasePath())); + syncFuture.cancel(true); // send interrupt signal to the executing worker thread + } + }, timeoutInSeconds, TimeUnit.SECONDS); + + // wait for whichever happens first (completion or timeout exception) + try { + syncFuture.join(); + } catch (java.util.concurrent.CompletionException e) { + if (e.getCause() instanceof java.util.concurrent.TimeoutException) { + log.error("Sync timed out for {} after {} seconds", table.getTableBasePath(), timeoutInSeconds); + } else { + log.error("Error running sync for {}", table.getTableBasePath(), e.getCause()); + } + } + } else { + // fallback to original synchronous behavior if no timeout option is provided + try { + conversionController.sync(conversionConfig, conversionSourceProvider); + } catch (Exception e) { + log.error("Error running sync for {}", table.getTableBasePath(), e); + } } } } @@ -288,14 +326,15 @@ private static void runSync(CommandLine cmd) throws IOException { String icebergCatalogConfigpath = getValueFromConfig(cmd, ICEBERG_CATALOG_CONFIG_PATH); String hadoopConfigpath = getValueFromConfig(cmd, HADOOP_CONFIG_PATH); String conversionProviderConfigpath = getValueFromConfig(cmd, CONVERTERS_CONFIG_PATH); + long timeoutInSeconds = Long.parseLong(getValueFromConfig(cmd, SYNC_TIMEOUT_OPTION)); DatasetConfig datasetConfig = getDatasetConfig(datasetConfigpath); CatalogConfig catalogConfig = getIcebergCatalogConfig(icebergCatalogConfigpath); Configuration hadoopConf = gethadoopConf(hadoopConfigpath); ConversionSourceProvider conversionSourceProvider = getConversionSourceProvider(conversionProviderConfigpath, datasetConfig, hadoopConf); List tableFormatList = datasetConfig.getTargetFormats(); - syncTableMetdata( - datasetConfig, tableFormatList, catalogConfig, hadoopConf, conversionSourceProvider); + syncTableMetadata( + datasetConfig, tableFormatList, catalogConfig, hadoopConf, conversionSourceProvider, timeoutInSeconds); } static byte[] getCustomConfigurations(String Configpath) throws IOException { From dae4d2fd930973ff506ec6f24afdad9c56568b2c Mon Sep 17 00:00:00 2001 From: Selim Soufargi Date: Mon, 18 May 2026 19:14:15 +0200 Subject: [PATCH 2/6] handle case of timeout=null --- .../src/main/java/org/apache/xtable/utilities/RunSync.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunSync.java b/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunSync.java index 145a57bc8..2f09bfb2d 100644 --- a/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunSync.java +++ b/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunSync.java @@ -326,7 +326,7 @@ private static void runSync(CommandLine cmd) throws IOException { String icebergCatalogConfigpath = getValueFromConfig(cmd, ICEBERG_CATALOG_CONFIG_PATH); String hadoopConfigpath = getValueFromConfig(cmd, HADOOP_CONFIG_PATH); String conversionProviderConfigpath = getValueFromConfig(cmd, CONVERTERS_CONFIG_PATH); - long timeoutInSeconds = Long.parseLong(getValueFromConfig(cmd, SYNC_TIMEOUT_OPTION)); + long timeoutInSeconds = getValueFromConfig(cmd, SYNC_TIMEOUT_OPTION) == null || getValueFromConfig(cmd, SYNC_TIMEOUT_OPTION).isEmpty() ? 0L : Long.parseLong(getValueFromConfig(cmd, SYNC_TIMEOUT_OPTION)); DatasetConfig datasetConfig = getDatasetConfig(datasetConfigpath); CatalogConfig catalogConfig = getIcebergCatalogConfig(icebergCatalogConfigpath); Configuration hadoopConf = gethadoopConf(hadoopConfigpath); From af92091ee34976ec4b129d85f4ea633a20ac092b Mon Sep 17 00:00:00 2001 From: Selim Soufargi Date: Mon, 18 May 2026 21:25:10 +0200 Subject: [PATCH 3/6] spotless:apply --- .../org/apache/xtable/utilities/RunSync.java | 82 ++++++++++++------- 1 file changed, 51 insertions(+), 31 deletions(-) diff --git a/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunSync.java b/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunSync.java index 2f09bfb2d..cfbb3d638 100644 --- a/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunSync.java +++ b/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunSync.java @@ -118,12 +118,13 @@ public class RunSync { true, "The interval in seconds to schedule the loop. Requires --continuousMode to be set. Defaults to 5 seconds.") .addOption(HELP_OPTION, "help", false, "Displays help information to run this utility") - .addOption( - SYNC_TIMEOUT_OPTION, - "syncTimeout", - true, - "The maximum time in seconds allowed for a single table sync before timing out. Defaults to no timeout.") - .addOption(HELP_OPTION, "help", false, "Displays help information to run this utility");; + .addOption( + SYNC_TIMEOUT_OPTION, + "syncTimeout", + true, + "The maximum time in seconds allowed for a single table sync before timing out. Defaults to no timeout.") + .addOption(HELP_OPTION, "help", false, "Displays help information to run this utility"); + ; static SourceTable sourceTableBuilder( @NonNull DatasetConfig.Table table, @@ -175,44 +176,54 @@ static void syncTableMetadata( java.util.concurrent.ScheduledExecutorService delayer = Executors.newScheduledThreadPool(1); for (DatasetConfig.Table table : datasetConfig.getDatasets()) { log.info( - "Running sync for basePath {} for following table formats {}", - table.getTableBasePath(), - tableFormatList); + "Running sync for basePath {} for following table formats {}", + table.getTableBasePath(), + tableFormatList); Properties sourceProperties = new Properties(); if (table.getPartitionSpec() != null) { sourceProperties.put( - HudiSourceConfig.PARTITION_FIELD_SPEC_CONFIG, table.getPartitionSpec()); + HudiSourceConfig.PARTITION_FIELD_SPEC_CONFIG, table.getPartitionSpec()); } SourceTable sourceTable = - sourceTableBuilder(table, catalogConfig, datasetConfig, sourceProperties); + sourceTableBuilder(table, catalogConfig, datasetConfig, sourceProperties); List targetTables = targetTableBuilder(table, catalogConfig, tableFormatList); ConversionConfig conversionConfig = - ConversionConfig.builder() - .sourceTable(sourceTable) - .targetTables(targetTables) - .syncMode(SyncMode.INCREMENTAL) - .build(); + ConversionConfig.builder() + .sourceTable(sourceTable) + .targetTables(targetTables) + .syncMode(SyncMode.INCREMENTAL) + .build(); if (timeoutInSeconds > 0) { - CompletableFuture syncFuture = CompletableFuture.runAsync(() -> { - conversionController.sync(conversionConfig, conversionSourceProvider); - }, syncExecutor); - - delayer.schedule(() -> { - if (!syncFuture.isDone()) { - // end complete the future exceptionally with a TimeoutException - syncFuture.completeExceptionally(new java.util.concurrent.TimeoutException( - "Sync timed out for " + table.getTableBasePath())); - syncFuture.cancel(true); // send interrupt signal to the executing worker thread - } - }, timeoutInSeconds, TimeUnit.SECONDS); + CompletableFuture syncFuture = + CompletableFuture.runAsync( + () -> { + conversionController.sync(conversionConfig, conversionSourceProvider); + }, + syncExecutor); + + delayer.schedule( + () -> { + if (!syncFuture.isDone()) { + // end complete the future exceptionally with a TimeoutException + syncFuture.completeExceptionally( + new java.util.concurrent.TimeoutException( + "Sync timed out for " + table.getTableBasePath())); + syncFuture.cancel(true); // send interrupt signal to the executing worker thread + } + }, + timeoutInSeconds, + TimeUnit.SECONDS); // wait for whichever happens first (completion or timeout exception) try { syncFuture.join(); } catch (java.util.concurrent.CompletionException e) { if (e.getCause() instanceof java.util.concurrent.TimeoutException) { - log.error("Sync timed out for {} after {} seconds", table.getTableBasePath(), timeoutInSeconds); + log.error( + "Sync timed out for {} after {} seconds", + table.getTableBasePath(), + timeoutInSeconds); } else { log.error("Error running sync for {}", table.getTableBasePath(), e.getCause()); } @@ -326,7 +337,11 @@ private static void runSync(CommandLine cmd) throws IOException { String icebergCatalogConfigpath = getValueFromConfig(cmd, ICEBERG_CATALOG_CONFIG_PATH); String hadoopConfigpath = getValueFromConfig(cmd, HADOOP_CONFIG_PATH); String conversionProviderConfigpath = getValueFromConfig(cmd, CONVERTERS_CONFIG_PATH); - long timeoutInSeconds = getValueFromConfig(cmd, SYNC_TIMEOUT_OPTION) == null || getValueFromConfig(cmd, SYNC_TIMEOUT_OPTION).isEmpty() ? 0L : Long.parseLong(getValueFromConfig(cmd, SYNC_TIMEOUT_OPTION)); + long timeoutInSeconds = + getValueFromConfig(cmd, SYNC_TIMEOUT_OPTION) == null + || getValueFromConfig(cmd, SYNC_TIMEOUT_OPTION).isEmpty() + ? 0L + : Long.parseLong(getValueFromConfig(cmd, SYNC_TIMEOUT_OPTION)); DatasetConfig datasetConfig = getDatasetConfig(datasetConfigpath); CatalogConfig catalogConfig = getIcebergCatalogConfig(icebergCatalogConfigpath); Configuration hadoopConf = gethadoopConf(hadoopConfigpath); @@ -334,7 +349,12 @@ private static void runSync(CommandLine cmd) throws IOException { getConversionSourceProvider(conversionProviderConfigpath, datasetConfig, hadoopConf); List tableFormatList = datasetConfig.getTargetFormats(); syncTableMetadata( - datasetConfig, tableFormatList, catalogConfig, hadoopConf, conversionSourceProvider, timeoutInSeconds); + datasetConfig, + tableFormatList, + catalogConfig, + hadoopConf, + conversionSourceProvider, + timeoutInSeconds); } static byte[] getCustomConfigurations(String Configpath) throws IOException { From b249bcdb85423cb65e3a0e2e59e327b51843b838 Mon Sep 17 00:00:00 2001 From: Selim Soufargi Date: Tue, 19 May 2026 19:27:30 +0200 Subject: [PATCH 4/6] fixing CI --- .../test/java/org/apache/xtable/ITConversionController.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java b/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java index 2da3078b6..8dacceb8a 100644 --- a/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java +++ b/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java @@ -223,6 +223,9 @@ private ConversionSourceProvider getConversionSourceProvider(String sourceTab @MethodSource("generateTestParametersForFormatsSyncModesAndPartitioning") public void testVariousOperations( String sourceTableFormat, SyncMode syncMode, boolean isPartitioned) { + // Disable the lock manager and heartbeat threads for both Catalog and Table configurations + jsc.hadoopConfiguration().set("iceberg.catalog.lock-impl", ""); + jsc.hadoopConfiguration().set("iceberg.tables.hadoop.lock-impl", ""); String tableName = getTableName(); ConversionController conversionController = new ConversionController(jsc.hadoopConfiguration()); List targetTableFormats = getOtherFormats(sourceTableFormat); From e50c55fcd69d34d4b377756e7555e9816e3b4ffc Mon Sep 17 00:00:00 2001 From: Selim Soufargi Date: Fri, 5 Jun 2026 21:15:04 +0200 Subject: [PATCH 5/6] guard long value, remove option help, fixedSingle Exec no cachedPool, submit() instead of delayer and runAsync + cancel(true), shutdown executor in finally --- .../apache/xtable/ITConversionController.java | 3 - .../org/apache/xtable/utilities/RunSync.java | 120 +++++++++--------- 2 files changed, 57 insertions(+), 66 deletions(-) diff --git a/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java b/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java index 8dacceb8a..2da3078b6 100644 --- a/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java +++ b/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java @@ -223,9 +223,6 @@ private ConversionSourceProvider getConversionSourceProvider(String sourceTab @MethodSource("generateTestParametersForFormatsSyncModesAndPartitioning") public void testVariousOperations( String sourceTableFormat, SyncMode syncMode, boolean isPartitioned) { - // Disable the lock manager and heartbeat threads for both Catalog and Table configurations - jsc.hadoopConfiguration().set("iceberg.catalog.lock-impl", ""); - jsc.hadoopConfiguration().set("iceberg.tables.hadoop.lock-impl", ""); String tableName = getTableName(); ConversionController conversionController = new ConversionController(jsc.hadoopConfiguration()); List targetTableFormats = getOtherFormats(sourceTableFormat); diff --git a/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunSync.java b/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunSync.java index cfbb3d638..4aeaa6ff8 100644 --- a/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunSync.java +++ b/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunSync.java @@ -26,7 +26,6 @@ import java.util.List; import java.util.Map; import java.util.Properties; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -117,14 +116,12 @@ public class RunSync { "continuousModeInterval", true, "The interval in seconds to schedule the loop. Requires --continuousMode to be set. Defaults to 5 seconds.") - .addOption(HELP_OPTION, "help", false, "Displays help information to run this utility") .addOption( SYNC_TIMEOUT_OPTION, "syncTimeout", true, "The maximum time in seconds allowed for a single table sync before timing out. Defaults to no timeout.") .addOption(HELP_OPTION, "help", false, "Displays help information to run this utility"); - ; static SourceTable sourceTableBuilder( @NonNull DatasetConfig.Table table, @@ -172,70 +169,62 @@ static void syncTableMetadata( ConversionSourceProvider conversionSourceProvider, long timeoutInSeconds) { ConversionController conversionController = new ConversionController(hadoopConf); - java.util.concurrent.ExecutorService syncExecutor = Executors.newCachedThreadPool(); - java.util.concurrent.ScheduledExecutorService delayer = Executors.newScheduledThreadPool(1); - for (DatasetConfig.Table table : datasetConfig.getDatasets()) { - log.info( - "Running sync for basePath {} for following table formats {}", - table.getTableBasePath(), - tableFormatList); - Properties sourceProperties = new Properties(); - if (table.getPartitionSpec() != null) { - sourceProperties.put( - HudiSourceConfig.PARTITION_FIELD_SPEC_CONFIG, table.getPartitionSpec()); - } + // use a single-thread executor since tasks are processed sequentially + java.util.concurrent.ExecutorService syncExecutor = Executors.newSingleThreadExecutor(); + try { - SourceTable sourceTable = - sourceTableBuilder(table, catalogConfig, datasetConfig, sourceProperties); - List targetTables = targetTableBuilder(table, catalogConfig, tableFormatList); - ConversionConfig conversionConfig = - ConversionConfig.builder() - .sourceTable(sourceTable) - .targetTables(targetTables) - .syncMode(SyncMode.INCREMENTAL) - .build(); - if (timeoutInSeconds > 0) { - CompletableFuture syncFuture = - CompletableFuture.runAsync( - () -> { - conversionController.sync(conversionConfig, conversionSourceProvider); - }, - syncExecutor); - - delayer.schedule( - () -> { - if (!syncFuture.isDone()) { - // end complete the future exceptionally with a TimeoutException - syncFuture.completeExceptionally( - new java.util.concurrent.TimeoutException( - "Sync timed out for " + table.getTableBasePath())); - syncFuture.cancel(true); // send interrupt signal to the executing worker thread - } - }, - timeoutInSeconds, - TimeUnit.SECONDS); - - // wait for whichever happens first (completion or timeout exception) - try { - syncFuture.join(); - } catch (java.util.concurrent.CompletionException e) { - if (e.getCause() instanceof java.util.concurrent.TimeoutException) { + for (DatasetConfig.Table table : datasetConfig.getDatasets()) { + log.info( + "Running sync for basePath {} for following table formats {}", + table.getTableBasePath(), + tableFormatList); + Properties sourceProperties = new Properties(); + if (table.getPartitionSpec() != null) { + sourceProperties.put( + HudiSourceConfig.PARTITION_FIELD_SPEC_CONFIG, table.getPartitionSpec()); + } + + SourceTable sourceTable = + sourceTableBuilder(table, catalogConfig, datasetConfig, sourceProperties); + List targetTables = targetTableBuilder(table, catalogConfig, tableFormatList); + ConversionConfig conversionConfig = + ConversionConfig.builder() + .sourceTable(sourceTable) + .targetTables(targetTables) + .syncMode(SyncMode.INCREMENTAL) + .build(); + if (timeoutInSeconds > 0) { + java.util.concurrent.Future standardFuture = + syncExecutor.submit( + () -> { + conversionController.sync(conversionConfig, conversionSourceProvider); + }); + try { + standardFuture.get(timeoutInSeconds, TimeUnit.SECONDS); + } catch (java.util.concurrent.TimeoutException e) { log.error( - "Sync timed out for {} after {} seconds", + "Sync timed out for {} after {} seconds. Triggering thread interrupt.", table.getTableBasePath(), timeoutInSeconds); - } else { + standardFuture.cancel(true); + } catch (java.util.concurrent.ExecutionException e) { log.error("Error running sync for {}", table.getTableBasePath(), e.getCause()); + } catch (InterruptedException e) { + log.error("Sync main runner thread was interrupted", e); + standardFuture.cancel(true); + Thread.currentThread().interrupt(); + } + } else { + // fallback to original synchronous behavior if no timeout option is provided + try { + conversionController.sync(conversionConfig, conversionSourceProvider); + } catch (Exception e) { + log.error("Error running sync for {}", table.getTableBasePath(), e); } - } - } else { - // fallback to original synchronous behavior if no timeout option is provided - try { - conversionController.sync(conversionConfig, conversionSourceProvider); - } catch (Exception e) { - log.error("Error running sync for {}", table.getTableBasePath(), e); } } + } finally { + syncExecutor.shutdownNow(); } } @@ -337,11 +326,16 @@ private static void runSync(CommandLine cmd) throws IOException { String icebergCatalogConfigpath = getValueFromConfig(cmd, ICEBERG_CATALOG_CONFIG_PATH); String hadoopConfigpath = getValueFromConfig(cmd, HADOOP_CONFIG_PATH); String conversionProviderConfigpath = getValueFromConfig(cmd, CONVERTERS_CONFIG_PATH); + String rawTimeout = getValueFromConfig(cmd, SYNC_TIMEOUT_OPTION); + if (rawTimeout != null && !rawTimeout.isEmpty()) { + com.google.common.base.Preconditions.checkArgument( + rawTimeout.matches("^\\d+$"), + "Invalid value for --%s: '%s' must be a valid non-negative integer.", + SYNC_TIMEOUT_OPTION, + rawTimeout); + } long timeoutInSeconds = - getValueFromConfig(cmd, SYNC_TIMEOUT_OPTION) == null - || getValueFromConfig(cmd, SYNC_TIMEOUT_OPTION).isEmpty() - ? 0L - : Long.parseLong(getValueFromConfig(cmd, SYNC_TIMEOUT_OPTION)); + rawTimeout == null || rawTimeout.isEmpty() ? 0L : Long.parseLong(rawTimeout); DatasetConfig datasetConfig = getDatasetConfig(datasetConfigpath); CatalogConfig catalogConfig = getIcebergCatalogConfig(icebergCatalogConfigpath); Configuration hadoopConf = gethadoopConf(hadoopConfigpath); From 782198497145d06a63b84c641ce0afe760f930fe Mon Sep 17 00:00:00 2001 From: Selim Soufargi Date: Sat, 6 Jun 2026 01:33:19 +0200 Subject: [PATCH 6/6] single sync mode: test sync ending before and after timeout --- .../apache/xtable/utilities/ITRunSync.java | 47 +++++++++++++++++++ 1 file changed, 47 insertions(+) diff --git a/xtable-utilities/src/test/java/org/apache/xtable/utilities/ITRunSync.java b/xtable-utilities/src/test/java/org/apache/xtable/utilities/ITRunSync.java index f18ce867f..59d3768b6 100644 --- a/xtable-utilities/src/test/java/org/apache/xtable/utilities/ITRunSync.java +++ b/xtable-utilities/src/test/java/org/apache/xtable/utilities/ITRunSync.java @@ -93,6 +93,53 @@ void testContinuousSyncMode(@TempDir Path tempDir) throws IOException { } } + @Test + void testSyncCompletesBeforeTimeout(@TempDir Path tempDir) throws IOException { + String tableName = "tiny-table"; + try (GenericTable table = + TestJavaHudiTable.forStandardSchema( + tableName, tempDir, null, HoodieTableType.COPY_ON_WRITE)) { + table.insertRows(5); // small numRows for it to finish before the timeout does + File configFile = writeConfigFile(tempDir, table, tableName); + + String[] args = new String[] {"--datasetConfig", configFile.getPath(), "--syncTimeout", "60"}; + RunSync.main(args); + // check successful sync + Path icebergMetadataPath = Paths.get(URI.create(table.getBasePath() + "/metadata")); + waitForNumIcebergCommits(icebergMetadataPath, 3); + } + } + + @Test + void testSyncTimeoutFires(@TempDir Path tempDir) throws IOException { + String tableName = "large-table"; + try (GenericTable table = + TestJavaHudiTable.forStandardSchema( + tableName, tempDir, null, HoodieTableType.COPY_ON_WRITE)) { + + table.insertRows(5000); + File configFile = writeConfigFile(tempDir, table, tableName); + + long startTime = System.currentTimeMillis(); + + String[] args = + new String[] { + "--datasetConfig", + configFile.getPath(), + "--syncTimeout", + "1" // rapid timeout compared to the large table + }; + RunSync.main(args); + + long durationInSeconds = (System.currentTimeMillis() - startTime) / 1000; + + org.junit.jupiter.api.Assertions.assertTrue( + durationInSeconds < 3, // being generous with 3 secs + "The sync should have cut off and stopped immediately around 1 second. Took: " + + durationInSeconds); + } + } + private static File writeConfigFile(Path tempDir, GenericTable table, String tableName) throws IOException { RunSync.DatasetConfig config =