Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ public class RunSync {
private static final String CONTINUOUS_MODE = "m";
private static final String CONTINUOUS_MODE_INTERVAL = "t";
private static final String HELP_OPTION = "h";
private static final String SYNC_TIMEOUT_OPTION = "timeout";

private static final Options OPTIONS =
new Options()
Expand Down Expand Up @@ -115,6 +116,11 @@ public class RunSync {
"continuousModeInterval",
true,
"The interval in seconds to schedule the loop. Requires --continuousMode to be set. Defaults to 5 seconds.")
.addOption(
SYNC_TIMEOUT_OPTION,
"syncTimeout",
true,
"The maximum time in seconds allowed for a single table sync before timing out. Defaults to no timeout.")
.addOption(HELP_OPTION, "help", false, "Displays help information to run this utility");

static SourceTable sourceTableBuilder(
Expand Down Expand Up @@ -155,38 +161,70 @@ static List<TargetTable> targetTableBuilder(
return targetTables;
}

static void syncTableMetdata(
static void syncTableMetadata(
DatasetConfig datasetConfig,
List<String> tableFormatList,
CatalogConfig catalogConfig,
Configuration hadoopConf,
ConversionSourceProvider conversionSourceProvider) {
ConversionSourceProvider conversionSourceProvider,
long timeoutInSeconds) {
ConversionController conversionController = new ConversionController(hadoopConf);
for (DatasetConfig.Table table : datasetConfig.getDatasets()) {
log.info(
"Running sync for basePath {} for following table formats {}",
table.getTableBasePath(),
tableFormatList);
Properties sourceProperties = new Properties();
if (table.getPartitionSpec() != null) {
sourceProperties.put(
HudiSourceConfig.PARTITION_FIELD_SPEC_CONFIG, table.getPartitionSpec());
}
// use a single-thread executor since tasks are processed sequentially
java.util.concurrent.ExecutorService syncExecutor = Executors.newSingleThreadExecutor();
try {

SourceTable sourceTable =
sourceTableBuilder(table, catalogConfig, datasetConfig, sourceProperties);
List<TargetTable> targetTables = targetTableBuilder(table, catalogConfig, tableFormatList);
ConversionConfig conversionConfig =
ConversionConfig.builder()
.sourceTable(sourceTable)
.targetTables(targetTables)
.syncMode(SyncMode.INCREMENTAL)
.build();
try {
conversionController.sync(conversionConfig, conversionSourceProvider);
} catch (Exception e) {
log.error("Error running sync for {}", table.getTableBasePath(), e);
for (DatasetConfig.Table table : datasetConfig.getDatasets()) {
log.info(
"Running sync for basePath {} for following table formats {}",
table.getTableBasePath(),
tableFormatList);
Properties sourceProperties = new Properties();
if (table.getPartitionSpec() != null) {
sourceProperties.put(
HudiSourceConfig.PARTITION_FIELD_SPEC_CONFIG, table.getPartitionSpec());
}

SourceTable sourceTable =
sourceTableBuilder(table, catalogConfig, datasetConfig, sourceProperties);
List<TargetTable> targetTables = targetTableBuilder(table, catalogConfig, tableFormatList);
ConversionConfig conversionConfig =
ConversionConfig.builder()
.sourceTable(sourceTable)
.targetTables(targetTables)
.syncMode(SyncMode.INCREMENTAL)
.build();
if (timeoutInSeconds > 0) {
java.util.concurrent.Future<?> standardFuture =
syncExecutor.submit(
() -> {
conversionController.sync(conversionConfig, conversionSourceProvider);
});
try {
standardFuture.get(timeoutInSeconds, TimeUnit.SECONDS);
} catch (java.util.concurrent.TimeoutException e) {
log.error(
"Sync timed out for {} after {} seconds. Triggering thread interrupt.",
table.getTableBasePath(),
timeoutInSeconds);
standardFuture.cancel(true);
} catch (java.util.concurrent.ExecutionException e) {
log.error("Error running sync for {}", table.getTableBasePath(), e.getCause());
} catch (InterruptedException e) {
log.error("Sync main runner thread was interrupted", e);
standardFuture.cancel(true);
Thread.currentThread().interrupt();
}
} else {
// fallback to original synchronous behavior if no timeout option is provided
try {
conversionController.sync(conversionConfig, conversionSourceProvider);
} catch (Exception e) {
log.error("Error running sync for {}", table.getTableBasePath(), e);
}
}
}
} finally {
syncExecutor.shutdownNow();
}
}

Expand Down Expand Up @@ -288,14 +326,29 @@ private static void runSync(CommandLine cmd) throws IOException {
String icebergCatalogConfigpath = getValueFromConfig(cmd, ICEBERG_CATALOG_CONFIG_PATH);
String hadoopConfigpath = getValueFromConfig(cmd, HADOOP_CONFIG_PATH);
String conversionProviderConfigpath = getValueFromConfig(cmd, CONVERTERS_CONFIG_PATH);
String rawTimeout = getValueFromConfig(cmd, SYNC_TIMEOUT_OPTION);
if (rawTimeout != null && !rawTimeout.isEmpty()) {
com.google.common.base.Preconditions.checkArgument(
rawTimeout.matches("^\\d+$"),
"Invalid value for --%s: '%s' must be a valid non-negative integer.",
SYNC_TIMEOUT_OPTION,
rawTimeout);
}
long timeoutInSeconds =
rawTimeout == null || rawTimeout.isEmpty() ? 0L : Long.parseLong(rawTimeout);
DatasetConfig datasetConfig = getDatasetConfig(datasetConfigpath);
CatalogConfig catalogConfig = getIcebergCatalogConfig(icebergCatalogConfigpath);
Configuration hadoopConf = gethadoopConf(hadoopConfigpath);
ConversionSourceProvider conversionSourceProvider =
getConversionSourceProvider(conversionProviderConfigpath, datasetConfig, hadoopConf);
List<String> tableFormatList = datasetConfig.getTargetFormats();
syncTableMetdata(
datasetConfig, tableFormatList, catalogConfig, hadoopConf, conversionSourceProvider);
syncTableMetadata(
datasetConfig,
tableFormatList,
catalogConfig,
hadoopConf,
conversionSourceProvider,
timeoutInSeconds);
}

static byte[] getCustomConfigurations(String Configpath) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,53 @@ void testContinuousSyncMode(@TempDir Path tempDir) throws IOException {
}
}

@Test
void testSyncCompletesBeforeTimeout(@TempDir Path tempDir) throws IOException {
String tableName = "tiny-table";
try (GenericTable table =
TestJavaHudiTable.forStandardSchema(
tableName, tempDir, null, HoodieTableType.COPY_ON_WRITE)) {
table.insertRows(5); // small numRows for it to finish before the timeout does
File configFile = writeConfigFile(tempDir, table, tableName);

String[] args = new String[] {"--datasetConfig", configFile.getPath(), "--syncTimeout", "60"};
RunSync.main(args);
// check successful sync
Path icebergMetadataPath = Paths.get(URI.create(table.getBasePath() + "/metadata"));
waitForNumIcebergCommits(icebergMetadataPath, 3);
}
}

@Test
void testSyncTimeoutFires(@TempDir Path tempDir) throws IOException {
String tableName = "large-table";
try (GenericTable table =
TestJavaHudiTable.forStandardSchema(
tableName, tempDir, null, HoodieTableType.COPY_ON_WRITE)) {

table.insertRows(5000);
File configFile = writeConfigFile(tempDir, table, tableName);

long startTime = System.currentTimeMillis();

String[] args =
new String[] {
"--datasetConfig",
configFile.getPath(),
"--syncTimeout",
"1" // rapid timeout compared to the large table
};
RunSync.main(args);

long durationInSeconds = (System.currentTimeMillis() - startTime) / 1000;

org.junit.jupiter.api.Assertions.assertTrue(
durationInSeconds < 3, // being generous with 3 secs
"The sync should have cut off and stopped immediately around 1 second. Took: "
+ durationInSeconds);
}
}

private static File writeConfigFile(Path tempDir, GenericTable table, String tableName)
throws IOException {
RunSync.DatasetConfig config =
Expand Down