diff --git a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaStatsExtractor.java b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaStatsExtractor.java index a685700e6..c65282aea 100644 --- a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaStatsExtractor.java +++ b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaStatsExtractor.java @@ -18,293 +18,69 @@ package org.apache.xtable.delta; -import java.io.IOException; -import java.util.ArrayDeque; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; import java.util.List; -import java.util.Map; -import java.util.Queue; import java.util.Set; -import java.util.function.Function; -import java.util.stream.Collectors; import lombok.AccessLevel; -import lombok.AllArgsConstructor; -import lombok.Builder; -import lombok.Getter; import lombok.NoArgsConstructor; -import lombok.Value; -import lombok.extern.log4j.Log4j2; - -import org.apache.commons.lang3.StringUtils; import org.apache.spark.sql.delta.actions.AddFile; -import com.fasterxml.jackson.annotation.JsonAnySetter; -import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; -import org.apache.xtable.collectors.CustomCollectors; -import org.apache.xtable.model.exception.ParseException; import org.apache.xtable.model.schema.InternalField; import org.apache.xtable.model.schema.InternalSchema; -import org.apache.xtable.model.schema.InternalType; import org.apache.xtable.model.stat.ColumnStat; import org.apache.xtable.model.stat.FileStats; -import org.apache.xtable.model.stat.Range; /** - * DeltaStatsExtractor extracts column stats and also responsible for their serialization leveraging - * {@link DeltaValueConverter}. + * Delta Standalone stats extractor - delegates to {@link DeltaStatsUtils} for shared logic. + * + * @deprecated This class is a thin wrapper around DeltaStatsUtils. Consider using DeltaStatsUtils + * directly. */ -@Log4j2 @NoArgsConstructor(access = AccessLevel.PRIVATE) public class DeltaStatsExtractor { - private static final Set FIELD_TYPES_WITH_STATS_SUPPORT = - new HashSet<>( - Arrays.asList( - InternalType.BOOLEAN, - InternalType.DATE, - InternalType.DECIMAL, - InternalType.DOUBLE, - InternalType.INT, - InternalType.LONG, - InternalType.FLOAT, - InternalType.STRING, - InternalType.TIMESTAMP, - InternalType.TIMESTAMP_NTZ)); - private static final DeltaStatsExtractor INSTANCE = new DeltaStatsExtractor(); - private static final ObjectMapper MAPPER = new ObjectMapper(); - - /* this data structure collects type names of all unrecognized Delta Lake stats. For instance - data file stats in presence of delete vectors would contain 'tightBounds' stat which is - currently not handled by XTable */ - private final Set unsupportedStats = new HashSet<>(); - public static DeltaStatsExtractor getInstance() { return INSTANCE; } + /** + * Converts XTable column statistics to Delta format JSON. + * + * @param schema the table schema + * @param numRecords the number of records + * @param columnStats the column statistics + * @return JSON string in Delta format + * @throws JsonProcessingException if serialization fails + */ public String convertStatsToDeltaFormat( InternalSchema schema, long numRecords, List columnStats) throws JsonProcessingException { - DeltaStats.DeltaStatsBuilder deltaStatsBuilder = DeltaStats.builder(); - deltaStatsBuilder.numRecords(numRecords); - if (columnStats == null) { - return MAPPER.writeValueAsString(deltaStatsBuilder.build()); - } - Set validPaths = getPathsFromStructSchemaForMinAndMaxStats(schema); - List validColumnStats = - columnStats.stream() - .filter(stat -> validPaths.contains(stat.getField().getPath())) - .collect(Collectors.toList()); - DeltaStats deltaStats = - deltaStatsBuilder - .minValues(getMinValues(validColumnStats)) - .maxValues(getMaxValues(validColumnStats)) - .nullCount(getNullCount(validColumnStats)) - .build(); - return MAPPER.writeValueAsString(deltaStats); - } - - private Set getPathsFromStructSchemaForMinAndMaxStats(InternalSchema schema) { - return schema.getAllFields().stream() - .filter( - field -> { - InternalType type = field.getSchema().getDataType(); - return FIELD_TYPES_WITH_STATS_SUPPORT.contains(type); - }) - .map(InternalField::getPath) - .collect(Collectors.toSet()); - } - - private Map getMinValues(List validColumnStats) { - return getValues(validColumnStats, columnStat -> columnStat.getRange().getMinValue()); - } - - private Map getMaxValues(List validColumnStats) { - return getValues(validColumnStats, columnStat -> columnStat.getRange().getMaxValue()); - } - - private Map getValues( - List validColumnStats, Function valueExtractor) { - Map jsonObject = new HashMap<>(); - validColumnStats.forEach( - columnStat -> { - InternalField field = columnStat.getField(); - String[] pathParts = field.getPathParts(); - insertValueAtPath( - jsonObject, - pathParts, - DeltaValueConverter.convertToDeltaColumnStatValue( - valueExtractor.apply(columnStat), field.getSchema())); - }); - return jsonObject; - } - - private Map getNullCount(List validColumnStats) { - // TODO: Additional work needed to track nulls maps & arrays. - Map jsonObject = new HashMap<>(); - validColumnStats.forEach( - columnStat -> { - String[] pathParts = columnStat.getField().getPathParts(); - insertValueAtPath(jsonObject, pathParts, columnStat.getNumNulls()); - }); - return jsonObject; - } - - private void insertValueAtPath(Map jsonObject, String[] pathParts, Object value) { - if (pathParts == null || pathParts.length == 0) { - return; - } - Map currObject = jsonObject; - for (int i = 0; i < pathParts.length; i++) { - String part = pathParts[i]; - if (i == pathParts.length - 1) { - currObject.put(part, value); - } else { - if (!currObject.containsKey(part)) { - currObject.put(part, new HashMap()); - } - try { - currObject = (HashMap) currObject.get(part); - } catch (ClassCastException e) { - throw new RuntimeException( - String.format( - "Cannot cast to hashmap while inserting stats at path %s", - String.join("->", pathParts)), - e); - } - } - } - } - - public FileStats getColumnStatsForFile(AddFile addFile, List fields) { - if (StringUtils.isEmpty(addFile.stats())) { - return FileStats.builder().columnStats(Collections.emptyList()).numRecords(0).build(); - } - // TODO: Additional work needed to track maps & arrays. - try { - DeltaStats deltaStats = MAPPER.readValue(addFile.stats(), DeltaStats.class); - collectUnsupportedStats(deltaStats.getAdditionalStats()); - - Map fieldPathToMaxValue = flattenStatMap(deltaStats.getMaxValues()); - Map fieldPathToMinValue = flattenStatMap(deltaStats.getMinValues()); - Map fieldPathToNullCount = flattenStatMap(deltaStats.getNullCount()); - List columnStats = - fields.stream() - .filter(field -> fieldPathToMaxValue.containsKey(field.getPath())) - .map( - field -> { - String fieldPath = field.getPath(); - Object minValue = - DeltaValueConverter.convertFromDeltaColumnStatValue( - fieldPathToMinValue.get(fieldPath), field.getSchema()); - Object maxValue = - DeltaValueConverter.convertFromDeltaColumnStatValue( - fieldPathToMaxValue.get(fieldPath), field.getSchema()); - Number nullCount = (Number) fieldPathToNullCount.get(fieldPath); - Range range = Range.vector(minValue, maxValue); - return ColumnStat.builder() - .field(field) - .numValues(deltaStats.getNumRecords()) - .numNulls(nullCount.longValue()) - .range(range) - .build(); - }) - .collect(CustomCollectors.toList(fields.size())); - return FileStats.builder() - .columnStats(columnStats) - .numRecords(deltaStats.getNumRecords()) - .build(); - } catch (IOException ex) { - throw new ParseException("Unable to parse stats json", ex); - } - } - - private void collectUnsupportedStats(Map additionalStats) { - if (additionalStats == null || additionalStats.isEmpty()) { - return; - } - - additionalStats.keySet().stream() - .filter(key -> !unsupportedStats.contains(key)) - .forEach( - key -> { - log.info("Unrecognized/unsupported Delta data file stat: {}", key); - unsupportedStats.add(key); - }); + return DeltaStatsUtils.convertStatsToDeltaFormat(schema, numRecords, columnStats); } /** - * Takes the input map which represents a json object and flattens it. + * Extracts column statistics from Delta AddFile. * - * @param statMap input json map - * @return map with keys representing the dot-path for the field + * @param addFile the Delta AddFile action + * @param fields the fields to extract stats for + * @return FileStats containing column statistics */ - private Map flattenStatMap(Map statMap) { - Map result = new HashMap<>(); - Queue statFieldQueue = new ArrayDeque<>(); - statFieldQueue.add(StatField.of("", statMap)); - while (!statFieldQueue.isEmpty()) { - StatField statField = statFieldQueue.poll(); - String prefix = statField.getParentPath().isEmpty() ? "" : statField.getParentPath() + "."; - statField - .getValues() - .forEach( - (fieldName, value) -> { - String fullName = prefix + fieldName; - if (value instanceof Map) { - statFieldQueue.add(StatField.of(fullName, (Map) value)); - } else { - result.put(fullName, value); - } - }); - } - return result; + public FileStats getColumnStatsForFile(AddFile addFile, List fields) { + return DeltaStatsUtils.parseColumnStatsFromJson(addFile.stats(), fields); } /** - * Returns the names of all unsupported stats that have been discovered during the parsing of - * Delta Lake stats. + * Returns unsupported stats discovered during parsing. * - * @return set of unsupported stats + * @return set of unsupported stat names */ @VisibleForTesting Set getUnsupportedStats() { - return Collections.unmodifiableSet(unsupportedStats); - } - - @Builder - @Value - private static class DeltaStats { - long numRecords; - Map minValues; - Map maxValues; - Map nullCount; - - /* this is a catch-all for any additional stats that are not explicitly handled */ - @JsonIgnore - @Getter(lazy = true) - Map additionalStats = new HashMap<>(); - - @JsonAnySetter - public void setAdditionalStat(String key, Object value) { - getAdditionalStats().put(key, value); - } - } - - @Value - @AllArgsConstructor(staticName = "of") - private static class StatField { - String parentPath; - Map values; + return DeltaStatsUtils.getUnsupportedStats(); } } diff --git a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaStatsUtils.java b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaStatsUtils.java new file mode 100644 index 000000000..299a87216 --- /dev/null +++ b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaStatsUtils.java @@ -0,0 +1,339 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.delta; + +import java.io.IOException; +import java.util.ArrayDeque; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; + +import lombok.AllArgsConstructor; +import lombok.Value; +import lombok.extern.log4j.Log4j2; + +import org.apache.commons.lang3.StringUtils; + +import com.fasterxml.jackson.annotation.JsonAnySetter; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.annotations.VisibleForTesting; + +import org.apache.xtable.collectors.CustomCollectors; +import org.apache.xtable.model.exception.ParseException; +import org.apache.xtable.model.schema.InternalField; +import org.apache.xtable.model.schema.InternalSchema; +import org.apache.xtable.model.schema.InternalType; +import org.apache.xtable.model.stat.ColumnStat; +import org.apache.xtable.model.stat.FileStats; +import org.apache.xtable.model.stat.Range; + +/** + * Shared utility for Delta stats conversion between XTable internal format and Delta format. Used + * by both Delta Standalone and Delta Kernel implementations. + */ +@Log4j2 +public class DeltaStatsUtils { + private static final Set FIELD_TYPES_WITH_STATS_SUPPORT = + new HashSet<>( + Arrays.asList( + InternalType.BOOLEAN, + InternalType.DATE, + InternalType.DECIMAL, + InternalType.DOUBLE, + InternalType.INT, + InternalType.LONG, + InternalType.FLOAT, + InternalType.STRING, + InternalType.TIMESTAMP, + InternalType.TIMESTAMP_NTZ)); + + private static final ObjectMapper MAPPER = new ObjectMapper(); + + /* this data structure collects type names of all unrecognized Delta Lake stats. For instance + data file stats in presence of delete vectors would contain 'tightBounds' stat which is + currently not handled by XTable */ + private static final Set unsupportedStats = new HashSet<>(); + + private DeltaStatsUtils() { + // Utility class, prevent instantiation + } + + /** + * Converts XTable internal column statistics to Delta format JSON string. + * + * @param schema the table schema + * @param numRecords the number of records in the file + * @param columnStats the list of column statistics + * @return JSON string in Delta stats format + * @throws JsonProcessingException if serialization fails + */ + public static String convertStatsToDeltaFormat( + InternalSchema schema, long numRecords, List columnStats) + throws JsonProcessingException { + DeltaStats.DeltaStatsBuilder deltaStatsBuilder = DeltaStats.builder(); + deltaStatsBuilder.numRecords(numRecords); + if (columnStats == null) { + return MAPPER.writeValueAsString(deltaStatsBuilder.build()); + } + Set validPaths = getPathsFromStructSchemaForMinAndMaxStats(schema); + List validColumnStats = + columnStats.stream() + .filter(stat -> validPaths.contains(stat.getField().getPath())) + .collect(Collectors.toList()); + DeltaStats deltaStats = + deltaStatsBuilder + .minValues(getMinValues(validColumnStats)) + .maxValues(getMaxValues(validColumnStats)) + .nullCount(getNullCount(validColumnStats)) + .build(); + return MAPPER.writeValueAsString(deltaStats); + } + + /** + * Parses Delta stats JSON and extracts column statistics for the specified fields. + * + * @param statsJson the Delta stats JSON string + * @param fields the fields to extract stats for + * @return FileStats containing column statistics and record count + * @throws ParseException if JSON parsing fails + */ + public static FileStats parseColumnStatsFromJson(String statsJson, List fields) { + if (StringUtils.isEmpty(statsJson)) { + return FileStats.builder().columnStats(Collections.emptyList()).numRecords(0).build(); + } + + try { + DeltaStats deltaStats = MAPPER.readValue(statsJson, DeltaStats.class); + + collectUnsupportedStats(deltaStats.getAdditionalStats()); + + Map fieldPathToMaxValue = flattenStatMap(deltaStats.getMaxValues()); + Map fieldPathToMinValue = flattenStatMap(deltaStats.getMinValues()); + Map fieldPathToNullCount = flattenStatMap(deltaStats.getNullCount()); + + List columnStats = + fields.stream() + .filter(field -> fieldPathToMaxValue.containsKey(field.getPath())) + .map( + field -> { + String fieldPath = field.getPath(); + Object minRaw = fieldPathToMinValue.get(fieldPath); + Object maxRaw = fieldPathToMaxValue.get(fieldPath); + Object nullCountRaw = fieldPathToNullCount.get(fieldPath); + + Object minValue = + minRaw != null + ? DeltaValueConverter.convertFromDeltaColumnStatValue( + minRaw, field.getSchema()) + : null; + Object maxValue = + maxRaw != null + ? DeltaValueConverter.convertFromDeltaColumnStatValue( + maxRaw, field.getSchema()) + : null; + long nullCount = + nullCountRaw instanceof Number ? ((Number) nullCountRaw).longValue() : 0; + + Range range = Range.vector(minValue, maxValue); + return ColumnStat.builder() + .field(field) + .numValues(deltaStats.getNumRecords()) + .numNulls(nullCount) + .range(range) + .build(); + }) + .collect(CustomCollectors.toList(fields.size())); + + return FileStats.builder() + .columnStats(columnStats) + .numRecords(deltaStats.getNumRecords()) + .build(); + } catch (IOException ex) { + throw new ParseException("Unable to parse stats json", ex); + } + } + + private static Set getPathsFromStructSchemaForMinAndMaxStats(InternalSchema schema) { + return schema.getAllFields().stream() + .filter( + field -> { + InternalType type = field.getSchema().getDataType(); + return FIELD_TYPES_WITH_STATS_SUPPORT.contains(type); + }) + .map(InternalField::getPath) + .collect(Collectors.toSet()); + } + + private static Map getMinValues(List validColumnStats) { + return getValues(validColumnStats, columnStat -> columnStat.getRange().getMinValue()); + } + + private static Map getMaxValues(List validColumnStats) { + return getValues(validColumnStats, columnStat -> columnStat.getRange().getMaxValue()); + } + + private static Map getValues( + List validColumnStats, Function valueExtractor) { + Map jsonObject = new HashMap<>(); + validColumnStats.forEach( + columnStat -> { + InternalField field = columnStat.getField(); + String[] pathParts = field.getPathParts(); + insertValueAtPath( + jsonObject, + pathParts, + DeltaValueConverter.convertToDeltaColumnStatValue( + valueExtractor.apply(columnStat), field.getSchema())); + }); + return jsonObject; + } + + private static Map getNullCount(List validColumnStats) { + // TODO: Additional work needed to track nulls maps & arrays. + Map jsonObject = new HashMap<>(); + validColumnStats.forEach( + columnStat -> { + String[] pathParts = columnStat.getField().getPathParts(); + insertValueAtPath(jsonObject, pathParts, columnStat.getNumNulls()); + }); + return jsonObject; + } + + private static void insertValueAtPath( + Map jsonObject, String[] pathParts, Object value) { + if (pathParts == null || pathParts.length == 0) { + return; + } + Map currObject = jsonObject; + for (int i = 0; i < pathParts.length; i++) { + String part = pathParts[i]; + if (i == pathParts.length - 1) { + currObject.put(part, value); + } else { + if (!currObject.containsKey(part)) { + currObject.put(part, new HashMap()); + } + try { + currObject = (HashMap) currObject.get(part); + } catch (ClassCastException e) { + throw new RuntimeException( + String.format( + "Cannot cast to hashmap while inserting stats at path %s", + String.join("->", pathParts)), + e); + } + } + } + } + + private static void collectUnsupportedStats(Map additionalStats) { + if (additionalStats == null || additionalStats.isEmpty()) { + return; + } + + additionalStats.keySet().stream() + .filter(key -> !unsupportedStats.contains(key)) + .forEach( + key -> { + log.info("Unrecognized/unsupported Delta data file stat: {}", key); + unsupportedStats.add(key); + }); + } + + /** + * Takes the input map which represents a json object and flattens it. + * + * @param statMap input json map + * @return map with keys representing the dot-path for the field + */ + private static Map flattenStatMap(Map statMap) { + Map result = new HashMap<>(); + // Return empty map if input is null + if (statMap == null) { + return result; + } + Queue statFieldQueue = new ArrayDeque<>(); + statFieldQueue.add(StatField.of("", statMap)); + while (!statFieldQueue.isEmpty()) { + StatField statField = statFieldQueue.poll(); + String prefix = statField.getParentPath().isEmpty() ? "" : statField.getParentPath() + "."; + Map values = statField.getValues(); + if (values != null) { + values.forEach( + (fieldName, value) -> { + String fullName = prefix + fieldName; + if (value instanceof Map) { + statFieldQueue.add(StatField.of(fullName, (Map) value)); + } else { + result.put(fullName, value); + } + }); + } + } + return result; + } + + /** + * Returns the names of all unsupported stats that have been discovered during the parsing of + * Delta Lake stats. + * + * @return set of unsupported stats + */ + @VisibleForTesting + public static Set getUnsupportedStats() { + return Collections.unmodifiableSet(unsupportedStats); + } + + /** Internal representation of Delta stats JSON structure. */ + @lombok.Builder + @Value + static class DeltaStats { + long numRecords; + Map minValues; + Map maxValues; + Map nullCount; + + /* this is a catch-all for any additional stats that are not explicitly handled */ + @JsonIgnore + @lombok.Getter(lazy = true) + Map additionalStats = new HashMap<>(); + + @JsonAnySetter + public void setAdditionalStat(String key, Object value) { + getAdditionalStats().put(key, value); + } + } + + /** Helper class for flattening nested stat maps. */ + @Value + @AllArgsConstructor(staticName = "of") + static class StatField { + String parentPath; + Map values; + } +} diff --git a/xtable-core/src/main/java/org/apache/xtable/kernel/DeltaKernelActionsConverter.java b/xtable-core/src/main/java/org/apache/xtable/kernel/DeltaKernelActionsConverter.java index c39785d37..40e910c3b 100644 --- a/xtable-core/src/main/java/org/apache/xtable/kernel/DeltaKernelActionsConverter.java +++ b/xtable-core/src/main/java/org/apache/xtable/kernel/DeltaKernelActionsConverter.java @@ -50,6 +50,47 @@ public static DeltaKernelActionsConverter getInstance() { return INSTANCE; } + /** + * Converts AddFile to InternalDataFile using cached table base path (most efficient). + * + * @param tableBasePath cached table base path from table.getPath(engine) + */ + public InternalDataFile convertAddActionToInternalDataFile( + AddFile addFile, + String tableBasePath, + FileFormat fileFormat, + List partitionFields, + List fields, + boolean includeColumnStats, + DeltaKernelPartitionExtractor partitionExtractor, + DeltaKernelStatsExtractor fileStatsExtractor, + Map partitionValues) { + FileStats fileStats = fileStatsExtractor.getColumnStatsForFile(addFile, fields); + List columnStats = + includeColumnStats ? fileStats.getColumnStats() : Collections.emptyList(); + long recordCount = fileStats.getNumRecords(); + + Map scalaMap = partitionValues; + + return InternalDataFile.builder() + .physicalPath(getFullPathToFile(addFile.getPath(), tableBasePath)) + .fileFormat(fileFormat) + .fileSizeBytes(addFile.getSize()) + .lastModified(addFile.getModificationTime()) + .partitionValues(partitionExtractor.partitionValueExtraction(scalaMap, partitionFields)) + .columnStats(columnStats) + .recordCount(recordCount) + .build(); + } + + /** + * Converts AddFile to InternalDataFile (deprecated - inefficient). + * + * @deprecated Use {@link #convertAddActionToInternalDataFile(AddFile, String, FileFormat, List, + * List, boolean, DeltaKernelPartitionExtractor, DeltaKernelStatsExtractor, Map)} with cached + * tableBasePath instead to avoid creating new Engine per call + */ + @Deprecated public InternalDataFile convertAddActionToInternalDataFile( AddFile addFile, Table table, @@ -60,6 +101,7 @@ public InternalDataFile convertAddActionToInternalDataFile( DeltaKernelPartitionExtractor partitionExtractor, DeltaKernelStatsExtractor fileStatsExtractor, Map partitionValues) { + // WARNING: Creates new Configuration + Engine per call - inefficient! FileStats fileStats = fileStatsExtractor.getColumnStatsForFile(addFile, fields); List columnStats = includeColumnStats ? fileStats.getColumnStats() : Collections.emptyList(); @@ -68,7 +110,7 @@ public InternalDataFile convertAddActionToInternalDataFile( Map scalaMap = partitionValues; return InternalDataFile.builder() - .physicalPath(getFullPathToFile(addFile.getPath(), table)) + .physicalPath(getFullPathToFile(addFile.getPath(), table)) // Inefficient! .fileFormat(fileFormat) .fileSizeBytes(addFile.getSize()) .lastModified(addFile.getModificationTime()) @@ -78,6 +120,35 @@ public InternalDataFile convertAddActionToInternalDataFile( .build(); } + /** + * Converts RemoveFile to InternalDataFile using cached table base path (most efficient). + * + * @param tableBasePath cached table base path from table.getPath(engine) + */ + public InternalDataFile convertRemoveActionToInternalDataFile( + RemoveFile removeFile, + String tableBasePath, + FileFormat fileFormat, + List partitionFields, + DeltaKernelPartitionExtractor partitionExtractor, + Map partitionValues) { + Map scalaMap = partitionValues; + + return InternalDataFile.builder() + .physicalPath(getFullPathToFile(removeFile.getPath(), tableBasePath)) + .fileFormat(fileFormat) + .partitionValues(partitionExtractor.partitionValueExtraction(scalaMap, partitionFields)) + .build(); + } + + /** + * Converts RemoveFile to InternalDataFile (deprecated - inefficient). + * + * @deprecated Use {@link #convertRemoveActionToInternalDataFile(RemoveFile, String, FileFormat, + * List, DeltaKernelPartitionExtractor, Map)} with cached tableBasePath instead to avoid + * creating new Engine per call + */ + @Deprecated public InternalDataFile convertRemoveActionToInternalDataFile( RemoveFile removeFile, Table table, @@ -85,10 +156,11 @@ public InternalDataFile convertRemoveActionToInternalDataFile( List partitionFields, DeltaKernelPartitionExtractor partitionExtractor, Map partitionValues) { + // WARNING: Creates new Configuration + Engine per call - inefficient! Map scalaMap = partitionValues; return InternalDataFile.builder() - .physicalPath(getFullPathToFile(removeFile.getPath(), table)) + .physicalPath(getFullPathToFile(removeFile.getPath(), table)) // Inefficient! .fileFormat(fileFormat) .partitionValues(partitionExtractor.partitionValueExtraction(scalaMap, partitionFields)) .build(); @@ -104,10 +176,50 @@ public FileFormat convertToFileFormat(String provider) { String.format("delta file format %s is not recognized", provider)); } + /** + * Constructs the full path to a file, handling both relative and absolute paths. + * + *

DEPRECATED: This method creates a new Configuration and Engine on every + * call, which is extremely inefficient. Use {@link #getFullPathToFile(String, Engine, Table)} or + * {@link #getFullPathToFile(String, String)} instead. + * + * @param dataFilePath the data file path (relative or absolute) + * @param table the Delta table + * @return the full absolute path to the file + * @deprecated Use {@link #getFullPathToFile(String, Engine, Table)} to avoid creating new + * Engine/Configuration on every call, or {@link #getFullPathToFile(String, String)} if you + * already have the base path + */ + @Deprecated static String getFullPathToFile(String dataFilePath, Table table) { + // WARNING: This creates new Configuration + Engine per call - severe performance issue! + // Kept for backwards compatibility but should not be used in hot paths Configuration hadoopConf = new Configuration(); Engine myEngine = DefaultEngine.create(hadoopConf); - String tableBasePath = table.getPath(myEngine); + return getFullPathToFile(dataFilePath, myEngine, table); + } + + /** + * Constructs the full path to a file using a provided Engine (efficient). + * + * @param dataFilePath the data file path (relative or absolute) + * @param engine the Delta Kernel engine to use for path resolution + * @param table the Delta table + * @return the full absolute path to the file + */ + static String getFullPathToFile(String dataFilePath, Engine engine, Table table) { + String tableBasePath = table.getPath(engine); + return getFullPathToFile(dataFilePath, tableBasePath); + } + + /** + * Constructs the full path to a file using a provided base path (most efficient). + * + * @param dataFilePath the data file path (relative or absolute) + * @param tableBasePath the table base path + * @return the full absolute path to the file + */ + static String getFullPathToFile(String dataFilePath, String tableBasePath) { if (dataFilePath.startsWith(tableBasePath)) { return dataFilePath; } diff --git a/xtable-core/src/main/java/org/apache/xtable/kernel/DeltaKernelConversionSource.java b/xtable-core/src/main/java/org/apache/xtable/kernel/DeltaKernelConversionSource.java index fa088f087..5c1b4ceeb 100644 --- a/xtable-core/src/main/java/org/apache/xtable/kernel/DeltaKernelConversionSource.java +++ b/xtable-core/src/main/java/org/apache/xtable/kernel/DeltaKernelConversionSource.java @@ -119,6 +119,9 @@ public TableChange getTableChangeForCommit(Long versionNumber) { String provider = ((SnapshotImpl) snapshot).getMetadata().getFormat().getProvider(); FileFormat fileFormat = actionsConverter.convertToFileFormat(provider); + // Cache table base path once to avoid creating new Engine per file + String tableBasePath = table.getPath(engine); + List actionsForVersion = getChangesState().getActionsForVersion(versionNumber); for (RowBackedAction action : actionsForVersion) { @@ -128,7 +131,7 @@ public TableChange getTableChangeForCommit(Long versionNumber) { InternalDataFile dataFile = actionsConverter.convertAddActionToInternalDataFile( addFile, - table, + tableBasePath, // Use cached base path fileFormat, tableAtVersion.getPartitioningFields(), tableAtVersion.getReadSchema().getFields(), @@ -147,7 +150,7 @@ public TableChange getTableChangeForCommit(Long versionNumber) { InternalDataFile dataFile = actionsConverter.convertRemoveActionToInternalDataFile( removeFile, - table, + tableBasePath, // Use cached base path fileFormat, tableAtVersion.getPartitioningFields(), DeltaKernelPartitionExtractor.getInstance(), diff --git a/xtable-core/src/main/java/org/apache/xtable/kernel/DeltaKernelConversionTarget.java b/xtable-core/src/main/java/org/apache/xtable/kernel/DeltaKernelConversionTarget.java new file mode 100644 index 000000000..bcb97733c --- /dev/null +++ b/xtable-core/src/main/java/org/apache/xtable/kernel/DeltaKernelConversionTarget.java @@ -0,0 +1,498 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.kernel; + +import static io.delta.kernel.utils.CloseableIterable.inMemoryIterable; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import lombok.Getter; +import lombok.extern.log4j.Log4j2; + +import org.apache.hadoop.conf.Configuration; + +import scala.collection.JavaConverters; +import scala.collection.Seq; + +import com.google.common.annotations.VisibleForTesting; + +import io.delta.kernel.Operation; +import io.delta.kernel.Snapshot; +import io.delta.kernel.Table; +import io.delta.kernel.Transaction; +import io.delta.kernel.TransactionBuilder; +import io.delta.kernel.TransactionCommitResult; +import io.delta.kernel.data.Row; +import io.delta.kernel.defaults.engine.DefaultEngine; +import io.delta.kernel.engine.Engine; +import io.delta.kernel.exceptions.TableNotFoundException; +import io.delta.kernel.hook.PostCommitHook; +import io.delta.kernel.internal.SnapshotImpl; +import io.delta.kernel.internal.actions.AddFile; +import io.delta.kernel.internal.actions.Metadata; +import io.delta.kernel.internal.actions.RemoveFile; +import io.delta.kernel.internal.actions.RowBackedAction; +import io.delta.kernel.internal.actions.SingleAction; +import io.delta.kernel.types.StructField; +import io.delta.kernel.types.StructType; +import io.delta.kernel.utils.CloseableIterable; +import io.delta.kernel.utils.CloseableIterator; + +import org.apache.xtable.conversion.TargetTable; +import org.apache.xtable.exception.NotSupportedException; +import org.apache.xtable.exception.UpdateException; +import org.apache.xtable.model.InternalTable; +import org.apache.xtable.model.metadata.TableSyncMetadata; +import org.apache.xtable.model.schema.InternalPartitionField; +import org.apache.xtable.model.schema.InternalSchema; +import org.apache.xtable.model.storage.InternalFilesDiff; +import org.apache.xtable.model.storage.PartitionFileGroup; +import org.apache.xtable.model.storage.TableFormat; +import org.apache.xtable.spi.sync.ConversionTarget; + +/** + * Implementation of {@link ConversionTarget} for Delta Lake using the Delta Kernel API. + * + *

This implementation uses Delta Kernel (io.delta.kernel) instead of Delta Standalone for write + * operations, providing better compatibility with cloud storage (S3, GCS, Azure Blob Storage, HDFS) + * and improved support for Delta Lake 3.x features. + * + *

Initialization: This class supports two initialization patterns: + * + *

    + *
  • Factory/ServiceLoader Pattern: Use the no-arg constructor followed by + * {@link #init(TargetTable, Configuration)}. This is used by {@link + * org.apache.xtable.conversion.ConversionTargetFactory}. + *
  • Direct Testing Pattern: Use the constructor with {@link TargetTable} and + * {@link Engine} parameters for direct instantiation with custom dependencies in tests. + *
+ * + *

Important: Do not mix initialization patterns. If you use the parameterized + * constructor, do not call {@link #init(TargetTable, Configuration)} afterward, as it will + * overwrite the custom Engine. + * + *

Exception Handling: This implementation only catches {@link + * io.delta.kernel.exceptions.TableNotFoundException} when checking for table existence, allowing + * other exceptions (network errors, permission issues, corrupted metadata) to propagate rather than + * being silently masked. This ensures real errors are visible and fail fast. + * + *

Known Limitations: + * + *

    + *
  • Commit Tags: Delta Kernel 4.0.0 does not support commit tags in commitInfo + * (e.g., XTABLE_METADATA tags). This affects source-to-target commit identifier mapping. + * Tracked in: https://github.com/apache/incubator-xtable/issues/819 + *
  • Schema Evolution: Schema changes are handled through Delta Kernel's + * transaction API, which may have different semantics compared to Delta Standalone. + *
  • Internal API Usage: This implementation casts to internal classes + * (SnapshotImpl, TableImpl) to access metadata and commit history, as Delta Kernel 4.0.0 + * lacks public APIs for these operations. These casts are brittle and may break on version + * upgrades. Public API alternatives should be used when available. + *
+ * + *

Implementation Choice: Delta Kernel API was chosen over Delta Standalone to: + * + *

    + *
  • Support newer Delta Lake features and protocol versions + *
  • Align with the Delta Lake community's direction (Kernel is the recommended API) + *
  • Reduce dependency on Spark-specific implementations + *
+ * + * @see ConversionTarget + * @see io.delta.kernel.Table + * @see io.delta.kernel.Transaction + */ +@Log4j2 +public class DeltaKernelConversionTarget implements ConversionTarget { + private static final String DELTA_LOG_RETENTION_DURATION = "delta.logRetentionDuration"; + + private DeltaKernelSchemaExtractor schemaExtractor; + private DeltaKernelPartitionExtractor partitionExtractor; + private DeltaKernelDataFileUpdatesExtractor dataKernelFileUpdatesExtractor; + + private String basePath; + private long logRetentionInHours; + private DeltaKernelConversionTarget.TransactionState transactionState; + private Engine engine; + + /** + * No-arg constructor for ServiceLoader instantiation. Must call {@link #init(TargetTable, + * Configuration)} before use. + */ + public DeltaKernelConversionTarget() {} + + /** + * Creates a fully initialized DeltaKernelConversionTarget with custom Engine. Typically used in + * tests. Do not call {@link #init(TargetTable, Configuration)} after this. + * + * @param targetTable the target table configuration + * @param engine custom Delta Kernel engine instance + */ + public DeltaKernelConversionTarget(TargetTable targetTable, Engine engine) { + this( + targetTable.getBasePath(), + targetTable.getMetadataRetention().toHours(), + engine, + DeltaKernelSchemaExtractor.getInstance(), + DeltaKernelPartitionExtractor.getInstance(), + DeltaKernelDataFileUpdatesExtractor.builder() + .engine(engine) + .basePath(targetTable.getBasePath()) + // Column statistics are not needed for conversion operations + .includeColumnStats(false) + .build()); + } + + @VisibleForTesting + DeltaKernelConversionTarget( + String tableDataPath, + long logRetentionInHours, + Engine engine, + DeltaKernelSchemaExtractor schemaExtractor, + DeltaKernelPartitionExtractor partitionExtractor, + DeltaKernelDataFileUpdatesExtractor dataKernelFileUpdatesExtractor) { + _init( + tableDataPath, + logRetentionInHours, + engine, + schemaExtractor, + partitionExtractor, + dataKernelFileUpdatesExtractor); + } + + /** + * Private initialization helper to avoid code duplication between constructor and init() paths. + */ + private void _init( + String tableDataPath, + long logRetentionInHours, + Engine engine, + DeltaKernelSchemaExtractor schemaExtractor, + DeltaKernelPartitionExtractor partitionExtractor, + DeltaKernelDataFileUpdatesExtractor dataKernelFileUpdatesExtractor) { + this.basePath = tableDataPath; + this.schemaExtractor = schemaExtractor; + this.partitionExtractor = partitionExtractor; + this.dataKernelFileUpdatesExtractor = dataKernelFileUpdatesExtractor; + this.engine = engine; + this.logRetentionInHours = logRetentionInHours; + } + + @Override + public void init(TargetTable targetTable, Configuration configuration) { + Engine engine = DefaultEngine.create(configuration); + + _init( + targetTable.getBasePath(), + targetTable.getMetadataRetention().toHours(), + engine, + DeltaKernelSchemaExtractor.getInstance(), + DeltaKernelPartitionExtractor.getInstance(), + DeltaKernelDataFileUpdatesExtractor.builder() + .engine(engine) + .basePath(targetTable.getBasePath()) + .includeColumnStats(true) + .build()); + } + + @Override + public void beginSync(InternalTable table) { + this.transactionState = + new DeltaKernelConversionTarget.TransactionState(engine, logRetentionInHours); + } + + @Override + public void syncSchema(InternalSchema schema) { + transactionState.setLatestSchema(schema); + } + + @Override + public void syncPartitionSpec(List partitionSpec) { + if (partitionSpec != null) { + Map spec = + partitionExtractor.convertToDeltaPartitionFormat(partitionSpec); + for (Map.Entry partitionEntry : spec.entrySet()) { + String partitionColumnName = partitionEntry.getKey(); + StructField partitionField = partitionEntry.getValue(); + + transactionState.addPartitionColumn(partitionColumnName); + if (partitionField != null + && transactionState.getLatestSchema().fields().stream() + .noneMatch(field -> field.getName().equals(partitionField.getName()))) { + // add generated columns to schema. + transactionState.addColumn(partitionField); + } + } + } + } + + @Override + public void syncMetadata(TableSyncMetadata metadata) { + transactionState.setMetadata(metadata); + } + + @Override + public void syncFilesForSnapshot(List partitionedDataFiles) { + Table table = Table.forPath(engine, basePath); + // Pass cached snapshot to avoid reloading it + transactionState.setActions( + dataKernelFileUpdatesExtractor.applySnapshot( + table, + partitionedDataFiles, + transactionState.getLatestSchemaInternal(), + transactionState.getCachedSnapshot())); + } + + @Override + public void syncFilesForDiff(InternalFilesDiff internalFilesDiff) { + // Use cached schema directly (already extracted during TransactionState initialization) + // For new tables, latestSchema is null and applyDiff will handle it appropriately + transactionState.setActions( + dataKernelFileUpdatesExtractor.applyDiff( + internalFilesDiff, + transactionState.getLatestSchemaInternal(), + basePath, + transactionState.getLatestSchema())); + } + + @Override + public void completeSync() { + transactionState.commitTransaction(); + transactionState = null; + } + + @Override + public Optional getTableMetadata() { + Table table = Table.forPath(engine, basePath); + Snapshot snapshot = table.getLatestSnapshot(engine); + + // WORKAROUND: Cast to SnapshotImpl (internal class) to access metadata configuration. + // Delta Kernel 4.0.0 does not provide a public API to access table metadata/configuration. + // This cast is brittle and may break on Kernel version upgrades. + Metadata metadata = ((SnapshotImpl) snapshot).getMetadata(); + + // Get configuration from metadata + Map configuration = metadata.getConfiguration(); + String metadataJson = configuration.get(TableSyncMetadata.XTABLE_METADATA); + + return TableSyncMetadata.fromJson(metadataJson); + } + + @Override + public String getTableFormat() { + return TableFormat.DELTA; + } + + @Override + public Optional getTargetCommitIdentifier(String sourceIdentifier) { + // Delta Kernel 4.0.0 does not support commit tags in commitInfo, which are required for + // source-to-target commit identifier mapping. This limitation is documented in: + // https://github.com/delta-io/delta/issues/6167 + // XTable tracking issue: https://github.com/apache/incubator-xtable/issues/819 + // + // Unlike DeltaConversionTarget (which uses Delta Standalone with commit tag support), + // DeltaKernelConversionTarget cannot retrieve commit tags from Delta Kernel's API. + // Rather than silently scanning all commits (O(n) performance cost) and always returning + // empty, we explicitly throw an exception to indicate this feature is unsupported. + // + // When Delta Kernel adds commit tag support, this method can be reimplemented to: + // 1. Scan commit history using tableImpl.getChanges(engine, 0, currentVersion, actionSet) + // 2. Extract tags from CommitInfo.tags MapValue + // 3. Parse XTABLE_METADATA from tags and match sourceIdentifier + throw new NotSupportedException( + "Source-to-target commit identifier mapping is not supported in DeltaKernelConversionTarget. " + + "Delta Kernel 4.0.0 does not support commit tags in commitInfo. " + + "See: https://github.com/delta-io/delta/issues/6167"); + } + + private class TransactionState { + private final Engine engine; + private final long retentionInHours; + private final List partitionColumns; + @Getter private StructType latestSchema; + @Getter private InternalSchema latestSchemaInternal; + private TableSyncMetadata metadata; + private List actions; + + // Cache the snapshot to avoid multiple loads during a single sync cycle + // This snapshot is used for: schema extraction, table existence check, and file diff operations + private Snapshot cachedSnapshot; + private boolean tableExists; + + private TransactionState(Engine engine, long retentionInHours) { + this.engine = engine; + this.partitionColumns = new ArrayList<>(); + this.retentionInHours = retentionInHours; + + try { + Table table = Table.forPath(engine, basePath); + this.cachedSnapshot = table.getLatestSnapshot(engine); + this.latestSchema = cachedSnapshot.getSchema(); + this.tableExists = true; + } catch (TableNotFoundException e) { + // Expected: table doesn't exist yet on first sync + this.latestSchema = null; + this.cachedSnapshot = null; + this.tableExists = false; + } + // Let other exceptions propagate (network issues, permissions, corrupted metadata, etc.) + } + + /** + * Adds a partition column name to the list. Package-private to allow access from outer class. + */ + void addPartitionColumn(String columnName) { + partitionColumns.add(columnName); + } + + /** + * Gets the cached snapshot. Returns null if no snapshot was cached (new table). Package-private + * to allow access from outer class. + */ + Snapshot getCachedSnapshot() { + return cachedSnapshot; + } + + void setMetadata(TableSyncMetadata metadata) { + this.metadata = metadata; + } + + /** + * Sets the actions to be committed. Converts from Scala Seq to Java List for internal storage. + */ + void setActions(Seq scalaActions) { + this.actions = JavaConverters.seqAsJavaList(scalaActions); + } + + private void addColumn(StructField field) { + latestSchema = latestSchema.add(field); + latestSchemaInternal = schemaExtractor.toInternalSchema(latestSchema); + } + + private void setLatestSchema(InternalSchema schema) { + this.latestSchemaInternal = schema; + this.latestSchema = schemaExtractor.fromInternalSchema(schema); + } + + private void commitTransaction() { + // Use cached table existence check instead of loading snapshot again + Operation operation = tableExists ? Operation.WRITE : Operation.CREATE_TABLE; + + Table table = Table.forPath(engine, basePath); + TransactionBuilder txnBuilder = + table.createTransactionBuilder(engine, "XTable Delta Sync", operation); + + // LIMITATION: Schema evolution for existing tables is NOT supported in Delta Kernel 4.0.0. + // The withSchema() method only works during CREATE_TABLE operations. For existing tables: + // - AddFile/RemoveFile actions are created using the old schema from existing snapshot + // - If source schema has evolved (columns added/removed/type changed), the Delta table + // will have mismatched metadata and data, causing query failures or incorrect results + // This is a known Delta Kernel limitation: https://github.com/delta-io/delta/issues/4305 + if (!tableExists) { + txnBuilder = txnBuilder.withSchema(engine, latestSchema); + + if (!partitionColumns.isEmpty()) { + txnBuilder = txnBuilder.withPartitionColumns(engine, partitionColumns); + } + } + + Map tableProperties = getConfigurationsForDeltaSync(); + txnBuilder = txnBuilder.withTableProperties(engine, tableProperties); + + Transaction txn = txnBuilder.build(engine); + List allActionRows = new ArrayList<>(); + + // Iterate through actions (Java List) and convert to Row format + for (RowBackedAction action : actions) { + + if (action instanceof AddFile) { + AddFile addFile = (AddFile) action; + Row wrappedRow = SingleAction.createAddFileSingleAction(addFile.toRow()); + allActionRows.add(wrappedRow); + } else if (action instanceof RemoveFile) { + RemoveFile removeFile = (RemoveFile) action; + Row wrappedRow = SingleAction.createRemoveFileSingleAction(removeFile.toRow()); + allActionRows.add(wrappedRow); + } + } + + CloseableIterator allActionsIterator = + new CloseableIterator() { + private int currentIndex = 0; + + @Override + public boolean hasNext() { + return currentIndex < allActionRows.size(); + } + + @Override + public Row next() { + return allActionRows.get(currentIndex++); + } + + @Override + public void close() {} + }; + + CloseableIterable dataActions = inMemoryIterable(allActionsIterator); + + try { + TransactionCommitResult result = txn.commit(engine, dataActions); + + // Execute PostCommitHooks to create checkpoints and _last_checkpoint metadata file + List hooks = result.getPostCommitHooks(); + if (hooks != null && !hooks.isEmpty()) { + for (PostCommitHook hook : hooks) { + try { + hook.threadSafeInvoke(engine); + } catch (Exception hookEx) { + // Post-commit hooks are optimizations; log but don't fail the transaction + log.warn("Post-commit hook failed but transaction succeeded", hookEx); + } + } + } + } catch (Exception e) { + throw new UpdateException("Failed to commit Delta Kernel transaction", e); + } + + // NOTE: Delta Kernel API limitations compared to Delta Standalone: + // - Commit tags (like XTABLE_METADATA in commitInfo.tags) are not yet supported + // - Operation type metadata (like DeltaOperations.Update) is simplified to + // Operation.WRITE/CREATE_TABLE + // - The commit timestamp is managed by Delta Kernel automatically + } + + private Map getConfigurationsForDeltaSync() { + Map configMap = new HashMap<>(); + // Delta Kernel will automatically upgrade protocol versions if/when features that require + // higher versions are used. Explicitly setting them via table properties is not supported + // in Delta Kernel 4.0.0 and causes UnknownConfiguration errors. + + configMap.put(TableSyncMetadata.XTABLE_METADATA, metadata.toJson()); + configMap.put( + DELTA_LOG_RETENTION_DURATION, String.format("interval %d hours", retentionInHours)); + + return configMap; + } + } +} diff --git a/xtable-core/src/main/java/org/apache/xtable/kernel/DeltaKernelDataFileExtractor.java b/xtable-core/src/main/java/org/apache/xtable/kernel/DeltaKernelDataFileExtractor.java index 782732016..0ca90dffc 100644 --- a/xtable-core/src/main/java/org/apache/xtable/kernel/DeltaKernelDataFileExtractor.java +++ b/xtable-core/src/main/java/org/apache/xtable/kernel/DeltaKernelDataFileExtractor.java @@ -76,6 +76,7 @@ public class DeltaDataFileIterator implements DataFileIterator { private final CloseableIterator scanFiles; private final FileFormat fileFormat; private final Table table; + private final String tableBasePath; // Cached to avoid repeated Engine calls private final List fields; private final List partitionFields; private final boolean includeColumnStats; @@ -91,6 +92,7 @@ private DeltaDataFileIterator( boolean includeColumnStats) { this.includeColumnStats = includeColumnStats; this.table = table; + this.tableBasePath = table.getPath(engine); // Cache base path once this.fields = schema.getFields(); String provider = ((SnapshotImpl) snapshot).getMetadata().getFormat().getProvider(); this.fileFormat = actionsConverter.convertToFileFormat(provider); @@ -159,7 +161,7 @@ private InternalDataFile computeNext() { return actionsConverter.convertAddActionToInternalDataFile( addFile, - table, + tableBasePath, // Use cached base path instead of Table fileFormat, partitionFields, fields, diff --git a/xtable-core/src/main/java/org/apache/xtable/kernel/DeltaKernelDataFileUpdatesExtractor.java b/xtable-core/src/main/java/org/apache/xtable/kernel/DeltaKernelDataFileUpdatesExtractor.java new file mode 100644 index 000000000..d875d98a6 --- /dev/null +++ b/xtable-core/src/main/java/org/apache/xtable/kernel/DeltaKernelDataFileUpdatesExtractor.java @@ -0,0 +1,361 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.kernel; + +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Stream; + +import lombok.Builder; + +import scala.collection.JavaConverters; +import scala.collection.Seq; + +import io.delta.kernel.Snapshot; +import io.delta.kernel.Table; +import io.delta.kernel.data.MapValue; +import io.delta.kernel.data.Row; +import io.delta.kernel.engine.Engine; +import io.delta.kernel.expressions.Column; +import io.delta.kernel.expressions.Literal; +import io.delta.kernel.internal.actions.AddFile; +import io.delta.kernel.internal.actions.RemoveFile; +import io.delta.kernel.internal.actions.RowBackedAction; +import io.delta.kernel.internal.util.VectorUtils; +import io.delta.kernel.statistics.DataFileStatistics; +import io.delta.kernel.types.StructType; + +import org.apache.xtable.collectors.CustomCollectors; +import org.apache.xtable.exception.ReadException; +import org.apache.xtable.model.schema.InternalField; +import org.apache.xtable.model.schema.InternalSchema; +import org.apache.xtable.model.schema.InternalType; +import org.apache.xtable.model.stat.ColumnStat; +import org.apache.xtable.model.stat.Range; +import org.apache.xtable.model.storage.FilesDiff; +import org.apache.xtable.model.storage.InternalDataFile; +import org.apache.xtable.model.storage.InternalFile; +import org.apache.xtable.model.storage.InternalFilesDiff; +import org.apache.xtable.model.storage.PartitionFileGroup; +import org.apache.xtable.paths.PathUtils; +import org.apache.xtable.spi.extractor.DataFileIterator; + +@Builder +public class DeltaKernelDataFileUpdatesExtractor { + @Builder.Default + private final DeltaKernelStatsExtractor deltaStatsExtractor = + DeltaKernelStatsExtractor.getInstance(); + + @Builder.Default + private final DeltaKernelPartitionExtractor deltaKernelPartitionExtractor = + DeltaKernelPartitionExtractor.getInstance(); + + @Builder.Default + private final DeltaKernelDataFileExtractor dataFileExtractor = + DeltaKernelDataFileExtractor.builder().build(); + + private final Engine engine; + private final String basePath; + private final boolean includeColumnStats; + + /** + * Applies snapshot changes, loading the snapshot fresh. + * + * @param table the Delta table + * @param partitionedDataFiles the new data files to sync + * @param tableSchema the internal schema + * @return sequence of Delta actions (AddFile/RemoveFile) + */ + public Seq applySnapshot( + Table table, List partitionedDataFiles, InternalSchema tableSchema) { + return applySnapshot(table, partitionedDataFiles, tableSchema, null); + } + + /** + * Applies snapshot changes with an optional cached snapshot to avoid redundant loads. + * + * @param table the Delta table + * @param partitionedDataFiles the new data files to sync + * @param tableSchema the internal schema + * @param cachedSnapshot optional pre-loaded snapshot (null to load fresh) + * @return sequence of Delta actions (AddFile/RemoveFile) + */ + public Seq applySnapshot( + Table table, + List partitionedDataFiles, + InternalSchema tableSchema, + Snapshot cachedSnapshot) { + + // all files in the current delta snapshot are potential candidates for remove actions, i.e. if + // the file is not present in the new snapshot (addedFiles) then the file is considered removed + Map previousFiles = new HashMap<>(); + StructType physicalSchema; + + // Use cached snapshot if provided, otherwise load it + boolean tableExists = cachedSnapshot != null || checkTableExists(table); + + if (tableExists) { + Snapshot snapshot = cachedSnapshot != null ? cachedSnapshot : table.getLatestSnapshot(engine); + + // Reuse DeltaKernelDataFileExtractor to iterate through existing files + // This avoids duplicating the scan logic for reading Delta files + try (DataFileIterator fileIterator = + dataFileExtractor.iterator(snapshot, table, engine, tableSchema)) { + + while (fileIterator.hasNext()) { + InternalDataFile internalFile = fileIterator.next(); + + // Convert InternalDataFile back to AddFile to create RemoveFile action + AddFile addFile = + createAddFileAction(internalFile, tableSchema, basePath, snapshot.getSchema()); + RemoveFile removeFile = + new RemoveFile(addFile.toRemoveFileRow(false, Optional.of(snapshot.getVersion()))); + // Use optimized path construction with Engine (reuses existing engine, no new + // Configuration) + String fullPath = + DeltaKernelActionsConverter.getFullPathToFile(removeFile.getPath(), engine, table); + previousFiles.put(fullPath, removeFile); + } + } catch (Exception e) { + throw new ReadException("Failed to scan existing Delta files", e); + } + + physicalSchema = snapshot.getSchema(); + + } else { + + // Table doesn't exist yet - no previous files to remove + // Convert InternalSchema to StructType for physical schema + DeltaKernelSchemaExtractor schemaExtractor = DeltaKernelSchemaExtractor.getInstance(); + physicalSchema = schemaExtractor.fromInternalSchema(tableSchema); + } + + FilesDiff diff = + InternalFilesDiff.findNewAndRemovedFiles(partitionedDataFiles, previousFiles); + + return applyDiff( + diff.getFilesAdded(), diff.getFilesRemoved(), tableSchema, basePath, physicalSchema); + } + + private boolean checkTableExists(Table table) { + return DeltaKernelUtils.tableExists(engine, table.getPath(engine)); + } + + public Seq applyDiff( + InternalFilesDiff internalFilesDiff, + InternalSchema tableSchema, + String tableBasePath, + StructType physicalSchema) { + List removeActions = + internalFilesDiff.dataFilesRemoved().stream() + .map(dFile -> createAddFileAction(dFile, tableSchema, tableBasePath, physicalSchema)) + .map(addFile -> new RemoveFile(addFile.toRemoveFileRow(false, Optional.empty()))) + .collect(CustomCollectors.toList(internalFilesDiff.dataFilesRemoved().size())); + return applyDiff( + internalFilesDiff.dataFilesAdded(), + removeActions, + tableSchema, + tableBasePath, + physicalSchema); + } + + private Seq applyDiff( + Set filesAdded, + Collection removeFileActions, + InternalSchema tableSchema, + String tableBasePath, + StructType physicalSchema) { + Stream addActions = + filesAdded.stream() + .filter(InternalDataFile.class::isInstance) + .map(file -> (InternalDataFile) file) + .map(dFile -> createAddFileAction(dFile, tableSchema, tableBasePath, physicalSchema)); + int totalActions = filesAdded.size() + removeFileActions.size(); + List allActions = + Stream.concat(addActions, removeFileActions.stream()) + .collect(CustomCollectors.toList(totalActions)); + return JavaConverters.asScalaBuffer(allActions).toSeq(); + } + + private AddFile createAddFileAction( + InternalDataFile dataFile, + InternalSchema tableSchema, + String tableBasePath, + StructType physicalSchema) { + // Convert partition values from Map to MapValue + Map partitionValuesMap = + deltaKernelPartitionExtractor.partitionValueSerialization(dataFile); + MapValue partitionValues = convertToMapValue(partitionValuesMap); + + // Generate column stats if enabled + Optional stats = Optional.empty(); + if (includeColumnStats) { + stats = + Optional.of( + convertToDataFileStatistics(dataFile.getRecordCount(), dataFile.getColumnStats())); + } + + Row addFileRow = + AddFile.createAddFileRow( + physicalSchema, + // Delta Lake supports relative and absolute paths in theory but relative paths seem + // more commonly supported by query engines in our testing + PathUtils.getRelativePath(dataFile.getPhysicalPath(), tableBasePath), + partitionValues, + dataFile.getFileSizeBytes(), + dataFile.getLastModified(), + true, // dataChange + Optional.empty(), // deletionVector + Optional.empty(), // tags + Optional.empty(), // baseRowId + Optional.empty(), // defaultRowCommitVersion + stats // stats - converted from InternalDataFile column stats + ); + + // Wrap the Row back into an AddFile object so we can use its methods + return new AddFile(addFileRow); + } + + /** + * Converts XTable's internal column statistics to Delta Kernel's DataFileStatistics format. + * + * @param recordCount the number of records in the file + * @param columnStats the list of column statistics from InternalDataFile + * @return DataFileStatistics object for Delta Kernel + */ + private DataFileStatistics convertToDataFileStatistics( + long recordCount, List columnStats) { + if (columnStats == null || columnStats.isEmpty()) { + return new DataFileStatistics(recordCount, new HashMap<>(), new HashMap<>(), new HashMap<>()); + } + + Map minValues = new HashMap<>(); + Map maxValues = new HashMap<>(); + Map nullCounts = new HashMap<>(); + + for (ColumnStat columnStat : columnStats) { + InternalField field = columnStat.getField(); + InternalType dataType = field.getSchema().getDataType(); + + // Only process supported types for statistics + if (!isSupportedStatsType(dataType)) { + continue; + } + + // Create Column reference from field path + Column column = new Column(field.getPathParts()); + + // Extract min/max from range + Range range = columnStat.getRange(); + if (range != null) { + Object minValue = range.getMinValue(); + Object maxValue = range.getMaxValue(); + + if (minValue != null) { + Literal minLiteral = convertToLiteral(minValue, field.getSchema()); + if (minLiteral != null) { + minValues.put(column, minLiteral); + } + } + + if (maxValue != null) { + Literal maxLiteral = convertToLiteral(maxValue, field.getSchema()); + if (maxLiteral != null) { + maxValues.put(column, maxLiteral); + } + } + } + + // Add null count + nullCounts.put(column, columnStat.getNumNulls()); + } + + return new DataFileStatistics(recordCount, minValues, maxValues, nullCounts); + } + + /** Checks if a data type is supported for statistics. */ + private boolean isSupportedStatsType(InternalType type) { + return type == InternalType.BOOLEAN + || type == InternalType.DATE + || type == InternalType.DECIMAL + || type == InternalType.DOUBLE + || type == InternalType.INT + || type == InternalType.LONG + || type == InternalType.FLOAT + || type == InternalType.STRING + || type == InternalType.TIMESTAMP + || type == InternalType.TIMESTAMP_NTZ; + } + + /** Converts an XTable value to a Delta Kernel Literal based on the field schema. */ + private Literal convertToLiteral(Object value, InternalSchema fieldSchema) { + InternalType dataType = fieldSchema.getDataType(); + + switch (dataType) { + case BOOLEAN: + return Literal.ofBoolean((Boolean) value); + case INT: + return Literal.ofInt(((Number) value).intValue()); + case LONG: + return Literal.ofLong(((Number) value).longValue()); + case FLOAT: + return Literal.ofFloat(((Number) value).floatValue()); + case DOUBLE: + return Literal.ofDouble(((Number) value).doubleValue()); + case STRING: + return Literal.ofString((String) value); + case DATE: + // XTable stores dates as days since epoch (int) + return Literal.ofDate(((Number) value).intValue()); + case TIMESTAMP: + // XTable stores timestamps as microseconds since epoch + return Literal.ofTimestamp(((Number) value).longValue()); + case TIMESTAMP_NTZ: + // XTable stores timestamp_ntz as microseconds since epoch + return Literal.ofTimestampNtz(((Number) value).longValue()); + case DECIMAL: + // Extract precision and scale from schema metadata + Map metadata = fieldSchema.getMetadata(); + int precision = 10; // default precision + int scale = 0; // default scale + if (metadata != null) { + Object precisionObj = metadata.get(InternalSchema.MetadataKey.DECIMAL_PRECISION); + Object scaleObj = metadata.get(InternalSchema.MetadataKey.DECIMAL_SCALE); + if (precisionObj instanceof Number) { + precision = ((Number) precisionObj).intValue(); + } + if (scaleObj instanceof Number) { + scale = ((Number) scaleObj).intValue(); + } + } + return Literal.ofDecimal((java.math.BigDecimal) value, precision, scale); + default: + // Unsupported type for stats + return null; + } + } + + private MapValue convertToMapValue(Map map) { + return VectorUtils.stringStringMapValue(map); + } +} diff --git a/xtable-core/src/main/java/org/apache/xtable/kernel/DeltaKernelSchemaExtractor.java b/xtable-core/src/main/java/org/apache/xtable/kernel/DeltaKernelSchemaExtractor.java index e3da2e7d2..0f0364ba6 100644 --- a/xtable-core/src/main/java/org/apache/xtable/kernel/DeltaKernelSchemaExtractor.java +++ b/xtable-core/src/main/java/org/apache/xtable/kernel/DeltaKernelSchemaExtractor.java @@ -37,12 +37,14 @@ import io.delta.kernel.types.LongType; import io.delta.kernel.types.MapType; import io.delta.kernel.types.StringType; +import io.delta.kernel.types.StructField; import io.delta.kernel.types.StructType; import io.delta.kernel.types.TimestampNTZType; import io.delta.kernel.types.TimestampType; import org.apache.xtable.collectors.CustomCollectors; import org.apache.xtable.delta.DeltaPartitionExtractor; +import org.apache.xtable.exception.NotSupportedException; import org.apache.xtable.model.schema.InternalField; import org.apache.xtable.model.schema.InternalSchema; import org.apache.xtable.model.schema.InternalType; @@ -66,9 +68,6 @@ public InternalSchema toInternalSchema(StructType structType) { return toInternalSchema(structType, null, false, null, null); } - String trimmedTypeName = ""; - InternalType type = null; - private InternalSchema toInternalSchema( DataType dataType, String parentPath, @@ -76,6 +75,8 @@ private InternalSchema toInternalSchema( String comment, FieldMetadata originalMetadata) { + String trimmedTypeName = ""; + InternalType type = null; Map metadata = null; List fields = null; @@ -229,4 +230,118 @@ private InternalSchema toInternalSchema( .fields(fields) .build(); } + + /** + * Converts an InternalSchema to Delta Kernel StructType. + * + * @param internalSchema the internal schema representation + * @return Delta Kernel StructType + */ + public StructType fromInternalSchema(InternalSchema internalSchema) { + List fields = + internalSchema.getFields().stream() + .map( + field -> + new StructField( + field.getName(), + convertFieldType(field), + field.getSchema().isNullable(), + getFieldMetadata(field.getSchema()))) + .collect(CustomCollectors.toList(internalSchema.getFields().size())); + return new StructType(fields); + } + + /** + * Converts an InternalField to Delta Kernel DataType. + * + * @param field the internal field + * @return Delta Kernel DataType + */ + private DataType convertFieldType(InternalField field) { + switch (field.getSchema().getDataType()) { + case STRING: + case ENUM: + return StringType.STRING; + case INT: + return IntegerType.INTEGER; + case LONG: + return LongType.LONG; + case BYTES: + case FIXED: + case UUID: + return BinaryType.BINARY; + case BOOLEAN: + return BooleanType.BOOLEAN; + case FLOAT: + return FloatType.FLOAT; + case DATE: + return DateType.DATE; + case TIMESTAMP: + return TimestampType.TIMESTAMP; + case TIMESTAMP_NTZ: + return TimestampNTZType.TIMESTAMP_NTZ; + case DOUBLE: + return DoubleType.DOUBLE; + case DECIMAL: + int precision = + (int) field.getSchema().getMetadata().get(InternalSchema.MetadataKey.DECIMAL_PRECISION); + int scale = + (int) field.getSchema().getMetadata().get(InternalSchema.MetadataKey.DECIMAL_SCALE); + return new DecimalType(precision, scale); + case RECORD: + return fromInternalSchema(field.getSchema()); + case MAP: + InternalField key = + field.getSchema().getFields().stream() + .filter( + mapField -> + InternalField.Constants.MAP_KEY_FIELD_NAME.equals(mapField.getName())) + .findFirst() + .orElseThrow(() -> new IllegalStateException("Invalid map schema")); + InternalField value = + field.getSchema().getFields().stream() + .filter( + mapField -> + InternalField.Constants.MAP_VALUE_FIELD_NAME.equals(mapField.getName())) + .findFirst() + .orElseThrow(() -> new IllegalStateException("Invalid map schema")); + return new MapType( + convertFieldType(key), convertFieldType(value), value.getSchema().isNullable()); + case LIST: + InternalField element = + field.getSchema().getFields().stream() + .filter( + arrayField -> + InternalField.Constants.ARRAY_ELEMENT_FIELD_NAME.equals( + arrayField.getName())) + .findFirst() + .orElseThrow(() -> new IllegalStateException("Invalid array schema")); + return new ArrayType(convertFieldType(element), element.getSchema().isNullable()); + default: + throw new NotSupportedException("Unsupported type: " + field.getSchema().getDataType()); + } + } + + /** + * Creates Delta Kernel FieldMetadata from InternalSchema. + * + * @param schema the internal schema + * @return Delta Kernel FieldMetadata + */ + private FieldMetadata getFieldMetadata(InternalSchema schema) { + FieldMetadata.Builder metadataBuilder = FieldMetadata.builder(); + + // Handle UUID type + InternalType type = schema.getDataType(); + if (type == InternalType.UUID) { + metadataBuilder.putString(InternalSchema.XTABLE_LOGICAL_TYPE, "uuid"); + } + + // Handle comment + if (schema.getComment() != null) { + metadataBuilder.putString("comment", schema.getComment()); + } + + return metadataBuilder.build(); + } } diff --git a/xtable-core/src/main/java/org/apache/xtable/kernel/DeltaKernelStatsExtractor.java b/xtable-core/src/main/java/org/apache/xtable/kernel/DeltaKernelStatsExtractor.java index a1ff2b599..a359b44be 100644 --- a/xtable-core/src/main/java/org/apache/xtable/kernel/DeltaKernelStatsExtractor.java +++ b/xtable-core/src/main/java/org/apache/xtable/kernel/DeltaKernelStatsExtractor.java @@ -18,307 +18,72 @@ package org.apache.xtable.kernel; -import java.io.IOException; -import java.util.ArrayDeque; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.Queue; import java.util.Set; -import java.util.function.Function; -import java.util.stream.Collectors; import lombok.AccessLevel; -import lombok.AllArgsConstructor; -import lombok.Builder; -import lombok.Getter; import lombok.NoArgsConstructor; -import lombok.Value; -import lombok.extern.log4j.Log4j2; -import org.apache.commons.lang3.StringUtils; - -import com.fasterxml.jackson.annotation.JsonAnySetter; -import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; import io.delta.kernel.internal.actions.AddFile; -import org.apache.xtable.collectors.CustomCollectors; -import org.apache.xtable.delta.DeltaValueConverter; -import org.apache.xtable.model.exception.ParseException; +import org.apache.xtable.delta.DeltaStatsUtils; import org.apache.xtable.model.schema.InternalField; import org.apache.xtable.model.schema.InternalSchema; -import org.apache.xtable.model.schema.InternalType; import org.apache.xtable.model.stat.ColumnStat; import org.apache.xtable.model.stat.FileStats; -import org.apache.xtable.model.stat.Range; /** - * DeltaStatsExtractor extracts column stats and also responsible for their serialization leveraging - * {@link DeltaValueConverter}. + * Delta Kernel stats extractor - delegates to {@link DeltaStatsUtils} for shared logic. + * + * @deprecated This class is a thin wrapper around DeltaStatsUtils. Consider using DeltaStatsUtils + * directly. */ -@Log4j2 @NoArgsConstructor(access = AccessLevel.PRIVATE) public class DeltaKernelStatsExtractor { - private static final Set FIELD_TYPES_WITH_STATS_SUPPORT = - new HashSet<>( - Arrays.asList( - InternalType.BOOLEAN, - InternalType.DATE, - InternalType.DECIMAL, - InternalType.DOUBLE, - InternalType.INT, - InternalType.LONG, - InternalType.FLOAT, - InternalType.STRING, - InternalType.TIMESTAMP, - InternalType.TIMESTAMP_NTZ)); - private static final DeltaKernelStatsExtractor INSTANCE = new DeltaKernelStatsExtractor(); - private static final ObjectMapper MAPPER = new ObjectMapper(); - - /* this data structure collects type names of all unrecognized Delta Lake stats. For instance - data file stats in presence of delete vectors would contain 'tightBounds' stat which is - currently not handled by XTable */ - private final Set unsupportedStats = new HashSet<>(); - public static DeltaKernelStatsExtractor getInstance() { return INSTANCE; } + /** + * Converts XTable column statistics to Delta format JSON. + * + * @param schema the table schema + * @param numRecords the number of records + * @param columnStats the column statistics + * @return JSON string in Delta format + * @throws JsonProcessingException if serialization fails + */ public String convertStatsToDeltaFormat( InternalSchema schema, long numRecords, List columnStats) throws JsonProcessingException { - DeltaStats.DeltaStatsBuilder deltaStatsBuilder = DeltaStats.builder(); - deltaStatsBuilder.numRecords(numRecords); - if (columnStats == null) { - return MAPPER.writeValueAsString(deltaStatsBuilder.build()); - } - Set validPaths = getPathsFromStructSchemaForMinAndMaxStats(schema); - List validColumnStats = - columnStats.stream() - .filter(stat -> validPaths.contains(stat.getField().getPath())) - .collect(Collectors.toList()); - DeltaStats deltaStats = - deltaStatsBuilder - .minValues(getMinValues(validColumnStats)) - .maxValues(getMaxValues(validColumnStats)) - .nullCount(getNullCount(validColumnStats)) - .build(); - return MAPPER.writeValueAsString(deltaStats); - } - - private Set getPathsFromStructSchemaForMinAndMaxStats(InternalSchema schema) { - return schema.getAllFields().stream() - .filter( - field -> { - InternalType type = field.getSchema().getDataType(); - return FIELD_TYPES_WITH_STATS_SUPPORT.contains(type); - }) - .map(InternalField::getPath) - .collect(Collectors.toSet()); - } - - private Map getMinValues(List validColumnStats) { - return getValues(validColumnStats, columnStat -> columnStat.getRange().getMinValue()); - } - - private Map getMaxValues(List validColumnStats) { - return getValues(validColumnStats, columnStat -> columnStat.getRange().getMaxValue()); - } - - private Map getValues( - List validColumnStats, Function valueExtractor) { - Map jsonObject = new HashMap<>(); - validColumnStats.forEach( - columnStat -> { - InternalField field = columnStat.getField(); - String[] pathParts = field.getPathParts(); - insertValueAtPath( - jsonObject, - pathParts, - DeltaValueConverter.convertToDeltaColumnStatValue( - valueExtractor.apply(columnStat), field.getSchema())); - }); - return jsonObject; - } - - private Map getNullCount(List validColumnStats) { - // TODO: Additional work needed to track nulls maps & arrays. - Map jsonObject = new HashMap<>(); - validColumnStats.forEach( - columnStat -> { - String[] pathParts = columnStat.getField().getPathParts(); - insertValueAtPath(jsonObject, pathParts, columnStat.getNumNulls()); - }); - return jsonObject; - } - - private void insertValueAtPath(Map jsonObject, String[] pathParts, Object value) { - if (pathParts == null || pathParts.length == 0) { - return; - } - Map currObject = jsonObject; - for (int i = 0; i < pathParts.length; i++) { - String part = pathParts[i]; - if (i == pathParts.length - 1) { - currObject.put(part, value); - } else { - if (!currObject.containsKey(part)) { - currObject.put(part, new HashMap()); - } - try { - currObject = (HashMap) currObject.get(part); - } catch (ClassCastException e) { - throw new RuntimeException( - String.format( - "Cannot cast to hashmap while inserting stats at path %s", - String.join("->", pathParts)), - e); - } - } - } - } - - public FileStats getColumnStatsForFile(AddFile addFile, List fields) { - - Optional statsOpt = addFile.getStatsJson(); - if (!statsOpt.isPresent() || StringUtils.isEmpty(statsOpt.get())) { - // No statistics available - return FileStats.builder().columnStats(Collections.emptyList()).numRecords(0).build(); - } - // TODO: Additional work needed to track maps & arrays. - try { - DeltaStats deltaStats = MAPPER.readValue(statsOpt.get(), DeltaStats.class); - - collectUnsupportedStats(deltaStats.getAdditionalStats()); - - Map fieldPathToMaxValue = flattenStatMap(deltaStats.getMaxValues()); - Map fieldPathToMinValue = flattenStatMap(deltaStats.getMinValues()); - Map fieldPathToNullCount = flattenStatMap(deltaStats.getNullCount()); - List columnStats = - fields.stream() - .filter(field -> fieldPathToMaxValue.containsKey(field.getPath())) - .map( - field -> { - String fieldPath = field.getPath(); - Object minRaw = fieldPathToMinValue.get(fieldPath); - Object maxRaw = fieldPathToMaxValue.get(fieldPath); - Object nullCountRaw = fieldPathToNullCount.get(fieldPath); - Object minValue = - minRaw != null - ? DeltaValueConverter.convertFromDeltaColumnStatValue( - minRaw, field.getSchema()) - : null; - Object maxValue = - maxRaw != null - ? DeltaValueConverter.convertFromDeltaColumnStatValue( - maxRaw, field.getSchema()) - : null; - long nullCount = - nullCountRaw instanceof Number ? ((Number) nullCountRaw).longValue() : 0; - Range range = Range.vector(minValue, maxValue); - return ColumnStat.builder() - .field(field) - .numValues(deltaStats.getNumRecords()) - .numNulls(nullCount) - .range(range) - .build(); - }) - .collect(CustomCollectors.toList(fields.size())); - return FileStats.builder() - .columnStats(columnStats) - .numRecords(deltaStats.getNumRecords()) - .build(); - } catch (IOException ex) { - throw new ParseException("Unable to parse stats json", ex); - } - } - - private void collectUnsupportedStats(Map additionalStats) { - if (additionalStats == null || additionalStats.isEmpty()) { - return; - } - - additionalStats.keySet().stream() - .filter(key -> !unsupportedStats.contains(key)) - .forEach( - key -> { - log.info("Unrecognized/unsupported Delta data file stat: {}", key); - unsupportedStats.add(key); - }); + return DeltaStatsUtils.convertStatsToDeltaFormat(schema, numRecords, columnStats); } /** - * Takes the input map which represents a json object and flattens it. + * Extracts column statistics from Delta Kernel AddFile. * - * @param statMap input json map - * @return map with keys representing the dot-path for the field + * @param addFile the Delta Kernel AddFile action + * @param fields the fields to extract stats for + * @return FileStats containing column statistics */ - private Map flattenStatMap(Map statMap) { - Map result = new HashMap<>(); - Queue statFieldQueue = new ArrayDeque<>(); - statFieldQueue.add(StatField.of("", statMap)); - while (!statFieldQueue.isEmpty()) { - StatField statField = statFieldQueue.poll(); - String prefix = statField.getParentPath().isEmpty() ? "" : statField.getParentPath() + "."; - statField - .getValues() - .forEach( - (fieldName, value) -> { - String fullName = prefix + fieldName; - if (value instanceof Map) { - statFieldQueue.add(StatField.of(fullName, (Map) value)); - } else { - result.put(fullName, value); - } - }); - } - return result; + public FileStats getColumnStatsForFile(AddFile addFile, List fields) { + // Delta Kernel wraps stats in Optional, extract it or use empty string + String statsJson = addFile.getStatsJson().orElse(""); + return DeltaStatsUtils.parseColumnStatsFromJson(statsJson, fields); } /** - * Returns the names of all unsupported stats that have been discovered during the parsing of - * Delta Lake stats. + * Returns unsupported stats discovered during parsing. * - * @return set of unsupported stats + * @return set of unsupported stat names */ @VisibleForTesting Set getUnsupportedStats() { - return Collections.unmodifiableSet(unsupportedStats); - } - - @Builder - @Value - private static class DeltaStats { - long numRecords; - Map minValues; - Map maxValues; - Map nullCount; - - /* this is a catch-all for any additional stats that are not explicitly handled */ - @JsonIgnore - @Getter(lazy = true) - Map additionalStats = new HashMap<>(); - - @JsonAnySetter - public void setAdditionalStat(String key, Object value) { - getAdditionalStats().put(key, value); - } - } - - @Value - @AllArgsConstructor(staticName = "of") - private static class StatField { - String parentPath; - Map values; + return DeltaStatsUtils.getUnsupportedStats(); } } diff --git a/xtable-core/src/main/java/org/apache/xtable/kernel/DeltaKernelUtils.java b/xtable-core/src/main/java/org/apache/xtable/kernel/DeltaKernelUtils.java new file mode 100644 index 000000000..bfb7fe60a --- /dev/null +++ b/xtable-core/src/main/java/org/apache/xtable/kernel/DeltaKernelUtils.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.kernel; + +import lombok.experimental.UtilityClass; + +import io.delta.kernel.Table; +import io.delta.kernel.engine.Engine; +import io.delta.kernel.exceptions.TableNotFoundException; + +/** + * Utility methods for working with Delta Kernel API. + * + *

This class provides common helper methods used across Delta Kernel integration components to + * avoid code duplication and ensure consistent behavior. + */ +@UtilityClass +public class DeltaKernelUtils { + + /** + * Checks if a Delta table exists at the specified path. + * + *

NOTE: This method loads the full snapshot, which reads and parses transaction log files. + * This is heavyweight but reliable. A lighter approach using {@code + * engine.getFileSystemClient().listFrom(basePath + "/_delta_log")} was attempted but had issues + * with exception handling - {@code listFrom()} may throw different exception types depending on + * the filesystem implementation. + * + *

This method only catches {@link TableNotFoundException}, allowing other exceptions (network + * errors, permission issues, corrupted metadata) to propagate. This ensures real errors are + * visible rather than being silently masked. + * + * @param engine the Delta Kernel engine to use + * @param basePath the path to the Delta table + * @return true if the table exists, false if it doesn't exist + * @throws RuntimeException if there's an error other than table not found (e.g., network issues, + * permissions) + */ + public static boolean tableExists(Engine engine, String basePath) { + try { + Table table = Table.forPath(engine, basePath); + table.getLatestSnapshot(engine); + return true; + } catch (TableNotFoundException e) { + // Expected: table doesn't exist yet + return false; + } + // Let other exceptions propagate (network issues, permissions, corrupted metadata, etc.) + } +} diff --git a/xtable-core/src/test/java/org/apache/xtable/kernel/TestDeltaKernelDataFileUpdatesExtractor.java b/xtable-core/src/test/java/org/apache/xtable/kernel/TestDeltaKernelDataFileUpdatesExtractor.java new file mode 100644 index 000000000..113c485e0 --- /dev/null +++ b/xtable-core/src/test/java/org/apache/xtable/kernel/TestDeltaKernelDataFileUpdatesExtractor.java @@ -0,0 +1,427 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.kernel; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.time.Instant; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import scala.collection.JavaConverters; + +import io.delta.kernel.Table; +import io.delta.kernel.defaults.engine.DefaultEngine; +import io.delta.kernel.engine.Engine; +import io.delta.kernel.internal.actions.AddFile; +import io.delta.kernel.internal.actions.RemoveFile; +import io.delta.kernel.internal.actions.RowBackedAction; +import io.delta.kernel.internal.util.VectorUtils; +import io.delta.kernel.types.IntegerType; +import io.delta.kernel.types.StringType; +import io.delta.kernel.types.StructField; +import io.delta.kernel.types.StructType; + +import org.apache.xtable.model.schema.InternalField; +import org.apache.xtable.model.schema.InternalPartitionField; +import org.apache.xtable.model.schema.InternalSchema; +import org.apache.xtable.model.schema.InternalType; +import org.apache.xtable.model.schema.PartitionTransformType; +import org.apache.xtable.model.stat.PartitionValue; +import org.apache.xtable.model.stat.Range; +import org.apache.xtable.model.storage.InternalDataFile; +import org.apache.xtable.model.storage.PartitionFileGroup; + +public class TestDeltaKernelDataFileUpdatesExtractor { + + @TempDir private Path tempDir; + + private Engine engine; + private DeltaKernelDataFileUpdatesExtractor extractor; + private InternalSchema testSchema; + private StructType physicalSchema; + + @BeforeEach + public void setup() { + Configuration hadoopConf = new Configuration(); + engine = DefaultEngine.create(hadoopConf); + + // Create test schema + testSchema = + InternalSchema.builder() + .name("record") + .dataType(InternalType.RECORD) + .fields( + Arrays.asList( + InternalField.builder() + .name("id") + .schema( + InternalSchema.builder() + .name("integer") + .dataType(InternalType.INT) + .isNullable(false) + .build()) + .build(), + InternalField.builder() + .name("name") + .schema( + InternalSchema.builder() + .name("string") + .dataType(InternalType.STRING) + .isNullable(true) + .build()) + .build())) + .build(); + + // Create physical schema + physicalSchema = + new StructType() + .add(new StructField("id", IntegerType.INTEGER, false)) + .add(new StructField("name", StringType.STRING, true)); + + // Initialize extractor + extractor = + DeltaKernelDataFileUpdatesExtractor.builder() + .engine(engine) + .basePath(tempDir.toString()) + .includeColumnStats(false) + .build(); + } + + @Test + public void testCreateAddFileAction() throws IOException { + // Create a test data file + String testFilePath = tempDir.resolve("test_data.parquet").toString(); + Files.createFile(Paths.get(testFilePath)); + + long expectedFileSize = 1024L; + long expectedRecordCount = 100L; + long expectedModificationTime = Instant.now().toEpochMilli(); + + InternalDataFile dataFile = + InternalDataFile.builder() + .physicalPath(testFilePath) + .fileSizeBytes(expectedFileSize) + .lastModified(expectedModificationTime) + .recordCount(expectedRecordCount) + .partitionValues(Collections.emptyList()) + .columnStats(Collections.emptyList()) + .build(); + + // Create a simple Delta table for testing + Table table = createSimpleDeltaTable(); + + List partitionedDataFiles = + Collections.singletonList( + PartitionFileGroup.builder() + .files(Collections.singletonList(dataFile)) + .partitionValues(Collections.emptyList()) + .build()); + + // Execute applySnapshot + scala.collection.Seq actions = + extractor.applySnapshot(table, partitionedDataFiles, testSchema); + + // Verify actions are created + assertNotNull(actions); + List actionList = JavaConverters.seqAsJavaList(actions); + assertFalse(actionList.isEmpty(), "Should have at least one action"); + + // Verify we have AddFile actions + boolean hasAddFile = actionList.stream().anyMatch(action -> action instanceof AddFile); + assertTrue(hasAddFile, "Should contain AddFile actions"); + + // Verify AddFile content + AddFile addFile = + actionList.stream() + .filter(action -> action instanceof AddFile) + .map(action -> (AddFile) action) + .findFirst() + .orElseThrow(() -> new AssertionError("Expected AddFile action not found")); + + // Assert on AddFile properties + assertTrue( + addFile.getPath().contains("test_data.parquet"), + "AddFile path should contain the test file name"); + assertEquals(expectedFileSize, addFile.getSize(), "AddFile size should match expected size"); + + // Verify partition values + assertNotNull(addFile.getPartitionValues(), "AddFile partition values should not be null"); + Map partitionValuesMap = VectorUtils.toJavaMap(addFile.getPartitionValues()); + assertTrue( + partitionValuesMap.isEmpty(), + "AddFile partition values should be empty for non-partitioned table"); + + // Verify modification time is set correctly + assertEquals( + expectedModificationTime, + addFile.getModificationTime(), + "AddFile modification time should match expected time"); + } + + @Test + public void testApplySnapshotWithPartitionedData() throws IOException { + // Create test data files with partitions + String testFilePath1 = tempDir.resolve("partition1/test_data1.parquet").toString(); + String testFilePath2 = tempDir.resolve("partition2/test_data2.parquet").toString(); + Files.createDirectories(Paths.get(testFilePath1).getParent()); + Files.createDirectories(Paths.get(testFilePath2).getParent()); + Files.createFile(Paths.get(testFilePath1)); + Files.createFile(Paths.get(testFilePath2)); + + InternalPartitionField partitionField = + InternalPartitionField.builder() + .sourceField( + InternalField.builder() + .name("partition_col") + .schema( + InternalSchema.builder() + .name("string") + .dataType(InternalType.STRING) + .build()) + .build()) + .transformType(PartitionTransformType.VALUE) + .build(); + + PartitionValue partitionValue1 = + PartitionValue.builder() + .partitionField(partitionField) + .range(Range.scalar("partition1")) + .build(); + + PartitionValue partitionValue2 = + PartitionValue.builder() + .partitionField(partitionField) + .range(Range.scalar("partition2")) + .build(); + + InternalDataFile dataFile1 = + InternalDataFile.builder() + .physicalPath(testFilePath1) + .fileSizeBytes(1024L) + .lastModified(Instant.now().toEpochMilli()) + .recordCount(50L) + .partitionValues(Collections.singletonList(partitionValue1)) + .columnStats(Collections.emptyList()) + .build(); + + InternalDataFile dataFile2 = + InternalDataFile.builder() + .physicalPath(testFilePath2) + .fileSizeBytes(2048L) + .lastModified(Instant.now().toEpochMilli()) + .recordCount(75L) + .partitionValues(Collections.singletonList(partitionValue2)) + .columnStats(Collections.emptyList()) + .build(); + + Table table = createSimpleDeltaTable(); + + List partitionedDataFiles = + Arrays.asList( + PartitionFileGroup.builder() + .files(Collections.singletonList(dataFile1)) + .partitionValues(Collections.singletonList(partitionValue1)) + .build(), + PartitionFileGroup.builder() + .files(Collections.singletonList(dataFile2)) + .partitionValues(Collections.singletonList(partitionValue2)) + .build()); + + // Execute applySnapshot + scala.collection.Seq actions = + extractor.applySnapshot(table, partitionedDataFiles, testSchema); + + // Verify + assertNotNull(actions); + List actionList = JavaConverters.seqAsJavaList(actions); + assertFalse(actionList.isEmpty(), "Should have actions for partitioned data"); + + // Should have AddFile actions for new files + long addFileCount = actionList.stream().filter(action -> action instanceof AddFile).count(); + assertTrue(addFileCount >= 2, "Should have at least 2 AddFile actions"); + } + + @Test + public void testDifferentialSyncWithExistingData() throws IOException { + // This test simulates a real differential sync scenario: + // 1. Delta table has existing files: file1.parquet, file2.parquet + // 2. New sync brings: file2.parquet (unchanged), file3.parquet (new) + // 3. Expected result: AddFile for file3, RemoveFile for file1 + + // Step 1: Create a Delta table with existing data + Path tablePath = tempDir.resolve("delta_table_with_data"); + Files.createDirectories(tablePath); + Path deltaLogPath = tablePath.resolve("_delta_log"); + Files.createDirectories(deltaLogPath); + + // Create existing data files + Path existingFile1 = tablePath.resolve("file1.parquet"); + Path existingFile2 = tablePath.resolve("file2.parquet"); + Files.createFile(existingFile1); + Files.createFile(existingFile2); + + // Create initial commit with file1 and file2 + Path initialCommit = deltaLogPath.resolve("00000000000000000000.json"); + String initialCommitJson = + "{\"protocol\":{\"minReaderVersion\":1,\"minWriterVersion\":2}}\n" + + "{\"metaData\":{\"id\":\"test-id\",\"format\":{\"provider\":\"parquet\",\"options\":{}},\"schemaString\":\"" + + physicalSchema.toJson().replace("\"", "\\\"") + + "\",\"partitionColumns\":[],\"configuration\":{},\"createdTime\":" + + System.currentTimeMillis() + + "}}\n" + + "{\"add\":{\"path\":\"file1.parquet\",\"partitionValues\":{},\"size\":1024,\"modificationTime\":" + + Instant.now().toEpochMilli() + + ",\"dataChange\":true,\"stats\":\"{}\"}}\n" + + "{\"add\":{\"path\":\"file2.parquet\",\"partitionValues\":{},\"size\":2048,\"modificationTime\":" + + Instant.now().toEpochMilli() + + ",\"dataChange\":true,\"stats\":\"{}\"}}\n"; + Files.write(initialCommit, initialCommitJson.getBytes(StandardCharsets.UTF_8)); + + // Create the table + Table table = Table.forPath(engine, tablePath.toString()); + assertNotNull(table); + + // Step 2: Prepare new sync data - file2 (unchanged) + file3 (new) + Path newFile3 = tablePath.resolve("file3.parquet"); + Files.createFile(newFile3); + + // IMPORTANT: Convert paths to absolute URI strings for consistent comparison with Delta Kernel + // Delta Kernel uses Hadoop Path which produces URI format (file:/...), not plain string paths + String file2PhysicalPath = new org.apache.hadoop.fs.Path(existingFile2.toUri()).toString(); + String file3PhysicalPath = new org.apache.hadoop.fs.Path(newFile3.toUri()).toString(); + + InternalDataFile dataFile2 = + InternalDataFile.builder() + .physicalPath(file2PhysicalPath) + .fileSizeBytes(2048L) + .lastModified(Instant.now().toEpochMilli()) + .recordCount(100L) + .partitionValues(Collections.emptyList()) + .columnStats(Collections.emptyList()) + .build(); + + InternalDataFile dataFile3 = + InternalDataFile.builder() + .physicalPath(file3PhysicalPath) + .fileSizeBytes(3072L) + .lastModified(Instant.now().toEpochMilli()) + .recordCount(150L) + .partitionValues(Collections.emptyList()) + .columnStats(Collections.emptyList()) + .build(); + + List newPartitionedDataFiles = + Collections.singletonList( + PartitionFileGroup.builder() + .files(Arrays.asList(dataFile2, dataFile3)) + .partitionValues(Collections.emptyList()) + .build()); + + // Step 3: Apply snapshot (differential sync) + DeltaKernelDataFileUpdatesExtractor syncExtractor = + DeltaKernelDataFileUpdatesExtractor.builder() + .engine(engine) + .basePath(tablePath.toString()) + .includeColumnStats(false) + .build(); + + scala.collection.Seq actions = + syncExtractor.applySnapshot(table, newPartitionedDataFiles, testSchema); + + // Step 4: Verify the differential sync results + assertNotNull(actions, "Actions should not be null"); + List actionList = JavaConverters.seqAsJavaList(actions); + assertFalse(actionList.isEmpty(), "Should have actions for differential sync"); + + // Count AddFile and RemoveFile actions + long addFileCount = actionList.stream().filter(action -> action instanceof AddFile).count(); + long removeFileCount = + actionList.stream().filter(action -> action instanceof RemoveFile).count(); + + // Verify: Should have exactly 1 AddFile for file3 (new file) + assertEquals(1, addFileCount, "Should have exactly 1 AddFile action for new file (file3)"); + + // Verify: Should have exactly 1 RemoveFile for file1 (removed from new sync) + assertEquals( + 1, + removeFileCount, + "Should have exactly 1 RemoveFile action for file1 that's not in new sync"); + + // Verify specific files in actions + boolean hasFile3Add = + actionList.stream() + .filter(action -> action instanceof AddFile) + .map(action -> (AddFile) action) + .anyMatch(addFile -> addFile.getPath().contains("file3.parquet")); + + assertTrue(hasFile3Add, "Should have AddFile action for file3.parquet"); + + // Verify file1 is in RemoveFile actions + boolean hasFile1Remove = + actionList.stream() + .filter(action -> action instanceof RemoveFile) + .map(action -> (RemoveFile) action) + .anyMatch(removeFile -> removeFile.getPath().contains("file1.parquet")); + + assertTrue(hasFile1Remove, "Should have RemoveFile action for file1.parquet"); + + // Note: file2 should not appear in actions as it's unchanged + } + + private Table createSimpleDeltaTable() { + try { + // Create a simple Delta table directory structure + Path tablePath = tempDir.resolve("delta_table"); + Files.createDirectories(tablePath); + Path deltaLogPath = tablePath.resolve("_delta_log"); + Files.createDirectories(deltaLogPath); + + // Create an empty commit file to make it a valid Delta table + Path commitFile = deltaLogPath.resolve("00000000000000000000.json"); + String commitJson = + "{\"protocol\":{\"minReaderVersion\":1,\"minWriterVersion\":2}}\n" + + "{\"metaData\":{\"id\":\"test-id\",\"format\":{\"provider\":\"parquet\",\"options\":{}},\"schemaString\":\"" + + physicalSchema.toJson().replace("\"", "\\\"") + + "\",\"partitionColumns\":[],\"configuration\":{},\"createdTime\":" + + System.currentTimeMillis() + + "}}\n"; + Files.write(commitFile, commitJson.getBytes(StandardCharsets.UTF_8)); + + return Table.forPath(engine, tablePath.toString()); + } catch (IOException e) { + throw new RuntimeException("Failed to create test Delta table", e); + } + } +} diff --git a/xtable-core/src/test/java/org/apache/xtable/kernel/TestDeltaKernelReadWriteIntegration.java b/xtable-core/src/test/java/org/apache/xtable/kernel/TestDeltaKernelReadWriteIntegration.java new file mode 100644 index 000000000..9a24fc009 --- /dev/null +++ b/xtable-core/src/test/java/org/apache/xtable/kernel/TestDeltaKernelReadWriteIntegration.java @@ -0,0 +1,488 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.kernel; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.time.Duration; +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Set; +import java.util.UUID; + +import org.apache.hadoop.conf.Configuration; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import io.delta.kernel.Snapshot; +import io.delta.kernel.Table; +import io.delta.kernel.defaults.engine.DefaultEngine; +import io.delta.kernel.engine.Engine; +import io.delta.kernel.internal.SnapshotImpl; + +import org.apache.xtable.conversion.TargetTable; +import org.apache.xtable.model.InternalSnapshot; +import org.apache.xtable.model.InternalTable; +import org.apache.xtable.model.schema.InternalField; +import org.apache.xtable.model.schema.InternalPartitionField; +import org.apache.xtable.model.schema.InternalSchema; +import org.apache.xtable.model.schema.InternalType; +import org.apache.xtable.model.schema.PartitionTransformType; +import org.apache.xtable.model.stat.PartitionValue; +import org.apache.xtable.model.stat.Range; +import org.apache.xtable.model.storage.DataLayoutStrategy; +import org.apache.xtable.model.storage.FileFormat; +import org.apache.xtable.model.storage.InternalDataFile; +import org.apache.xtable.model.storage.PartitionFileGroup; +import org.apache.xtable.model.storage.TableFormat; +import org.apache.xtable.spi.sync.TableFormatSync; + +/** + * Comprehensive end-to-end integration test for Delta Kernel read and write operations. + * + *

This test validates: 1. Writing data to Delta tables using DeltaKernelConversionTarget 2. + * Reading data from Delta tables using DeltaKernelConversionSource 3. Round-trip data integrity + * (write → read → validate) 4. Partitioned tables 5. Incremental updates (add/remove files) 6. Time + * travel (version-based reads) 7. Empty table handling + */ +public class TestDeltaKernelReadWriteIntegration { + private static final Random RANDOM = new Random(); + private static final Instant LAST_COMMIT_TIME = Instant.now(); + + @TempDir public Path tempDir; + private Engine engine; + + @BeforeEach + public void setup() { + Configuration hadoopConf = new Configuration(); + engine = DefaultEngine.create(hadoopConf); + } + + /** + * Test 1: Basic Write and Read Validates that data written to Delta can be read back correctly. + */ + @Test + public void testBasicWriteAndRead() throws Exception { + String tableName = "test_basic_" + UUID.randomUUID(); + Path basePath = tempDir.resolve(tableName); + Files.createDirectories(basePath); + + // === WRITE PHASE === + InternalSchema schema = createSimpleSchema(); + DeltaKernelConversionTarget writer = createWriter(tableName, basePath); + + // Create test data files + InternalDataFile file1 = createDataFile(1, Collections.emptyList(), basePath); + InternalDataFile file2 = createDataFile(2, Collections.emptyList(), basePath); + + // Write data to Delta table + InternalTable writeTable = createInternalTable(tableName, basePath, schema, null); + InternalSnapshot snapshot = buildSnapshot(writeTable, "0", file1, file2); + TableFormatSync.getInstance().syncSnapshot(Collections.singletonList(writer), snapshot); + + // Verify Delta table was created + assertTrue(Files.exists(basePath.resolve("_delta_log")), "Delta log directory should exist"); + + // === READ PHASE === + DeltaKernelConversionSource reader = createReader(tableName, basePath); + + // Read current table metadata + InternalTable readTable = reader.getCurrentTable(); + assertNotNull(readTable, "Should be able to read table"); + assertEquals(tableName, readTable.getName()); + + // Normalize paths for comparison (handle file:// prefix differences) + String expectedPath = basePath.toString(); + String actualPath = readTable.getBasePath().replace("file://", "").replace("file:", ""); + assertTrue( + actualPath.endsWith(expectedPath) || actualPath.equals(expectedPath), + "Base path should match. Expected: " + expectedPath + ", Actual: " + actualPath); + + // Verify schema + InternalSchema readSchema = readTable.getReadSchema(); + assertNotNull(readSchema); + assertEquals(schema.getFields().size(), readSchema.getFields().size()); + + // Read current snapshot + InternalSnapshot readSnapshot = reader.getCurrentSnapshot(); + assertNotNull(readSnapshot); + + // Extract data files from partition groups (files with same partition values are grouped) + List dataFiles = extractDataFiles(readSnapshot); + assertEquals(2, dataFiles.size(), "Should have 2 files in snapshot"); + + // Compare by physical path to uniquely identify files (not by size which could be duplicated) + assertTrue( + dataFiles.stream().anyMatch(f -> f.getPhysicalPath().contains("data_1.parquet")), + "Should contain file1 (data_1.parquet)"); + assertTrue( + dataFiles.stream().anyMatch(f -> f.getPhysicalPath().contains("data_2.parquet")), + "Should contain file2 (data_2.parquet)"); + } + + /** + * Test 2: Partitioned Table Write and Read Validates partition handling in both write and read + * operations. + */ + @Test + public void testPartitionedTableRoundTrip() throws Exception { + String tableName = "test_partitioned_" + UUID.randomUUID(); + Path basePath = tempDir.resolve(tableName); + Files.createDirectories(basePath); + + DeltaKernelConversionTarget writer = createWriter(tableName, basePath); + DeltaKernelConversionSource reader = createReader(tableName, basePath); + + // Define partition field + InternalPartitionField partitionField = + InternalPartitionField.builder() + .sourceField( + InternalField.builder() + .name("string_field") + .schema( + InternalSchema.builder() + .name("string") + .dataType(InternalType.STRING) + .build()) + .build()) + .transformType(PartitionTransformType.VALUE) + .build(); + + // === WRITE PHASE === + InternalSchema schema = createSimpleSchema(); + InternalTable table = + createInternalTable(tableName, basePath, schema, Collections.singletonList(partitionField)); + + // Create partitioned data files + List partition1 = + Collections.singletonList( + PartitionValue.builder() + .partitionField(partitionField) + .range(Range.scalar("category_a")) + .build()); + List partition2 = + Collections.singletonList( + PartitionValue.builder() + .partitionField(partitionField) + .range(Range.scalar("category_b")) + .build()); + + InternalDataFile file1 = createDataFile(1, partition1, basePath); + InternalDataFile file2 = createDataFile(2, partition1, basePath); + InternalDataFile file3 = createDataFile(3, partition2, basePath); + + InternalSnapshot snapshot = buildSnapshot(table, "0", file1, file2, file3); + TableFormatSync.getInstance().syncSnapshot(Collections.singletonList(writer), snapshot); + + // === READ PHASE === + InternalTable readTable = reader.getCurrentTable(); + + // Verify partitioning + assertNotNull(readTable.getPartitioningFields()); + assertEquals(1, readTable.getPartitioningFields().size()); + assertEquals( + "string_field", readTable.getPartitioningFields().get(0).getSourceField().getName()); + + // Verify all files are present + InternalSnapshot readSnapshot = reader.getCurrentSnapshot(); + List dataFiles = extractDataFiles(readSnapshot); + assertEquals(3, dataFiles.size(), "Should have all 3 partitioned files"); + + // Verify partition columns in Delta metadata + Table deltaTable = Table.forPath(engine, basePath.toString()); + Snapshot deltaSnapshot = deltaTable.getLatestSnapshot(engine); + SnapshotImpl snapshotImpl = (SnapshotImpl) deltaSnapshot; + Set partitionColumns = snapshotImpl.getMetadata().getPartitionColNames(); + assertEquals(1, partitionColumns.size()); + assertTrue(partitionColumns.contains("string_field")); + } + + /** + * Test 3: Incremental Updates (Add/Remove Files) Validates that incremental changes are properly + * handled. + */ + @Test + public void testIncrementalUpdates() throws Exception { + String tableName = "test_incremental_" + UUID.randomUUID(); + Path basePath = tempDir.resolve(tableName); + Files.createDirectories(basePath); + + DeltaKernelConversionTarget writer = createWriter(tableName, basePath); + DeltaKernelConversionSource reader = createReader(tableName, basePath); + + InternalSchema schema = createSimpleSchema(); + InternalTable table = createInternalTable(tableName, basePath, schema, null); + + // === SNAPSHOT 1: Initial files === + InternalDataFile file1 = createDataFile(1, Collections.emptyList(), basePath); + InternalDataFile file2 = createDataFile(2, Collections.emptyList(), basePath); + InternalSnapshot snapshot1 = buildSnapshot(table, "0", file1, file2); + TableFormatSync.getInstance().syncSnapshot(Collections.singletonList(writer), snapshot1); + + InternalSnapshot read1 = reader.getCurrentSnapshot(); + assertEquals(2, extractDataFiles(read1).size(), "Should have 2 files after first snapshot"); + + // === SNAPSHOT 2: Remove file1, keep file2, add file3 === + InternalDataFile file3 = createDataFile(3, Collections.emptyList(), basePath); + InternalSnapshot snapshot2 = buildSnapshot(table, "1", file2, file3); + TableFormatSync.getInstance().syncSnapshot(Collections.singletonList(writer), snapshot2); + + InternalSnapshot read2 = reader.getCurrentSnapshot(); + List files2 = extractDataFiles(read2); + assertEquals(2, files2.size(), "Should have 2 files after second snapshot"); + + // Verify correct files are present (compare by path, not size) + assertTrue( + files2.stream().anyMatch(f -> f.getPhysicalPath().contains("data_2.parquet")), + "file2 should be present"); + assertTrue( + files2.stream().anyMatch(f -> f.getPhysicalPath().contains("data_3.parquet")), + "file3 should be present"); + assertFalse( + files2.stream().anyMatch(f -> f.getPhysicalPath().contains("data_1.parquet")), + "file1 should be removed"); + + // === SNAPSHOT 3: Replace all files === + InternalDataFile file4 = createDataFile(4, Collections.emptyList(), basePath); + InternalSnapshot snapshot3 = buildSnapshot(table, "2", file4); + TableFormatSync.getInstance().syncSnapshot(Collections.singletonList(writer), snapshot3); + + InternalSnapshot read3 = reader.getCurrentSnapshot(); + List files3 = extractDataFiles(read3); + assertEquals(1, files3.size(), "Should have only 1 file after third snapshot"); + assertTrue( + files3.get(0).getPhysicalPath().contains("data_4.parquet"), + "Should contain file4 (data_4.parquet)"); + } + + /** Test 4: Read at Specific Version (Time Travel) Validates version-based reading. */ + @Test + public void testReadAtVersion() throws Exception { + String tableName = "test_versioned_" + UUID.randomUUID(); + Path basePath = tempDir.resolve(tableName); + Files.createDirectories(basePath); + + DeltaKernelConversionTarget writer = createWriter(tableName, basePath); + DeltaKernelConversionSource reader = createReader(tableName, basePath); + + InternalSchema schema = createSimpleSchema(); + InternalTable table = createInternalTable(tableName, basePath, schema, null); + + // Write version 0 + InternalDataFile file1 = createDataFile(1, Collections.emptyList(), basePath); + InternalSnapshot snapshot0 = buildSnapshot(table, "0", file1); + TableFormatSync.getInstance().syncSnapshot(Collections.singletonList(writer), snapshot0); + + // Write version 1 + InternalDataFile file2 = createDataFile(2, Collections.emptyList(), basePath); + InternalSnapshot snapshot1 = buildSnapshot(table, "1", file1, file2); + TableFormatSync.getInstance().syncSnapshot(Collections.singletonList(writer), snapshot1); + + // Write version 2 + InternalDataFile file3 = createDataFile(3, Collections.emptyList(), basePath); + InternalSnapshot snapshot2 = buildSnapshot(table, "2", file2, file3); + TableFormatSync.getInstance().syncSnapshot(Collections.singletonList(writer), snapshot2); + + // Read at version 0 (should have only file1) + InternalTable tableV0 = reader.getTable(0L); + assertNotNull(tableV0); + + // Read at version 1 (should have file1 and file2) + InternalTable tableV1 = reader.getTable(1L); + assertNotNull(tableV1); + + // Read latest version (should have file2 and file3) + InternalSnapshot latestSnapshot = reader.getCurrentSnapshot(); + List latestFiles = extractDataFiles(latestSnapshot); + assertEquals(2, latestFiles.size()); + + // Verify latest version doesn't have file1 (compare by path, not size) + assertFalse( + latestFiles.stream().anyMatch(f -> f.getPhysicalPath().contains("data_1.parquet")), + "Latest version should not have file1"); + } + + /** Test 5: Empty Table Creation and Read Validates handling of empty tables. */ + @Test + public void testEmptyTableRoundTrip() throws Exception { + String tableName = "test_empty_" + UUID.randomUUID(); + Path basePath = tempDir.resolve(tableName); + Files.createDirectories(basePath); + + DeltaKernelConversionTarget writer = createWriter(tableName, basePath); + DeltaKernelConversionSource reader = createReader(tableName, basePath); + + // Write empty table with just schema + InternalSchema schema = createSimpleSchema(); + InternalTable table = createInternalTable(tableName, basePath, schema, null); + InternalSnapshot emptySnapshot = buildSnapshot(table, "0"); // No files + + TableFormatSync.getInstance().syncSnapshot(Collections.singletonList(writer), emptySnapshot); + + // Read back + InternalTable readTable = reader.getCurrentTable(); + assertNotNull(readTable); + assertEquals(schema.getFields().size(), readTable.getReadSchema().getFields().size()); + + InternalSnapshot readSnapshot = reader.getCurrentSnapshot(); + assertNotNull(readSnapshot); + assertEquals(0, readSnapshot.getPartitionedDataFiles().size(), "Should have no files"); + } + + // ==================== Helper Methods ==================== + + private DeltaKernelConversionTarget createWriter(String tableName, Path basePath) { + return new DeltaKernelConversionTarget( + TargetTable.builder() + .name(tableName) + .basePath(basePath.toString()) + .metadataRetention(Duration.of(1, ChronoUnit.HOURS)) + .formatName(TableFormat.DELTA) + .build(), + engine); + } + + private DeltaKernelConversionSource createReader(String tableName, Path basePath) { + return DeltaKernelConversionSource.builder() + .basePath(basePath.toString()) + .tableName(tableName) + .engine(engine) + .build(); + } + + private InternalSchema createSimpleSchema() { + Map timestampMetadata = new HashMap<>(); + timestampMetadata.put( + InternalSchema.MetadataKey.TIMESTAMP_PRECISION, InternalSchema.MetadataValue.MILLIS); + + return InternalSchema.builder() + .dataType(InternalType.RECORD) + .name("test_schema") + .fields( + Arrays.asList( + InternalField.builder() + .name("id") + .schema( + InternalSchema.builder() + .name("long") + .dataType(InternalType.LONG) + .isNullable(false) + .build()) + .build(), + InternalField.builder() + .name("string_field") + .schema( + InternalSchema.builder() + .name("string") + .dataType(InternalType.STRING) + .isNullable(true) + .build()) + .build(), + InternalField.builder() + .name("int_field") + .schema( + InternalSchema.builder() + .name("int") + .dataType(InternalType.INT) + .isNullable(true) + .build()) + .build(), + InternalField.builder() + .name("timestamp_field") + .schema( + InternalSchema.builder() + .name("timestamp") + .dataType(InternalType.TIMESTAMP) + .isNullable(true) + .metadata(timestampMetadata) + .build()) + .build())) + .isNullable(false) + .build(); + } + + private InternalTable createInternalTable( + String tableName, + Path basePath, + InternalSchema schema, + List partitionFields) { + return InternalTable.builder() + .name(tableName) + .basePath(basePath.toUri().toString()) + .layoutStrategy(DataLayoutStrategy.FLAT) + .tableFormat(TableFormat.HUDI) + .readSchema(schema) + .partitioningFields(partitionFields) + .latestCommitTime(LAST_COMMIT_TIME) + .build(); + } + + private InternalSnapshot buildSnapshot( + InternalTable table, String sourceIdentifier, InternalDataFile... dataFiles) { + return InternalSnapshot.builder() + .table(table) + .partitionedDataFiles(PartitionFileGroup.fromFiles(Arrays.asList(dataFiles))) + .sourceIdentifier(sourceIdentifier) + .build(); + } + + private InternalDataFile createDataFile( + int index, List partitionValues, Path basePath) { + try { + Path filePath = basePath.resolve("data_" + index + ".parquet"); + Files.createFile(filePath); + + String physicalPath = new org.apache.hadoop.fs.Path(filePath.toUri()).toString(); + + return InternalDataFile.builder() + .fileFormat(FileFormat.APACHE_PARQUET) + .fileSizeBytes(1000 + index) + .physicalPath(physicalPath) + .recordCount(100) + .partitionValues(partitionValues) + .columnStats(Collections.emptyList()) + .lastModified(Instant.now().toEpochMilli()) + .build(); + } catch (IOException e) { + throw new RuntimeException("Failed to create test data file", e); + } + } + + private List extractDataFiles(InternalSnapshot snapshot) { + List files = new ArrayList<>(); + for (PartitionFileGroup group : snapshot.getPartitionedDataFiles()) { + files.addAll(group.getDataFiles()); + } + return files; + } +} diff --git a/xtable-core/src/test/java/org/apache/xtable/kernel/TestDeltaKernelSchemaExtractor.java b/xtable-core/src/test/java/org/apache/xtable/kernel/TestDeltaKernelSchemaExtractor.java index 4e242da1d..47bede2e7 100644 --- a/xtable-core/src/test/java/org/apache/xtable/kernel/TestDeltaKernelSchemaExtractor.java +++ b/xtable-core/src/test/java/org/apache/xtable/kernel/TestDeltaKernelSchemaExtractor.java @@ -18,6 +18,10 @@ package org.apache.xtable.kernel; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -38,6 +42,7 @@ import io.delta.kernel.types.LongType; import io.delta.kernel.types.MapType; import io.delta.kernel.types.StringType; +import io.delta.kernel.types.StructField; import io.delta.kernel.types.StructType; import io.delta.kernel.types.TimestampNTZType; import io.delta.kernel.types.TimestampType; @@ -47,6 +52,11 @@ import org.apache.xtable.model.schema.InternalType; public class TestDeltaKernelSchemaExtractor { + + private final DeltaKernelSchemaExtractor extractor = DeltaKernelSchemaExtractor.getInstance(); + + // ========== Tests for toInternalSchema() ========== + @Test public void testPrimitiveTypes() { Map decimalMetadata = new HashMap<>(); @@ -873,4 +883,591 @@ public void testIcebergToDeltaUUIDSupport() { internalSchema, DeltaKernelSchemaExtractor.getInstance().toInternalSchema(structRepresentation)); } + + // ========== Tests for fromInternalSchema() ========== + + @Test + public void testFromInternalSchemaSimpleTypes() { + // Create an InternalSchema with simple types + InternalField idField = + InternalField.builder() + .name("id") + .schema( + InternalSchema.builder() + .name("integer") + .dataType(InternalType.INT) + .isNullable(false) + .build()) + .build(); + + InternalField nameField = + InternalField.builder() + .name("name") + .schema( + InternalSchema.builder() + .name("string") + .dataType(InternalType.STRING) + .isNullable(true) + .build()) + .build(); + + InternalField activeField = + InternalField.builder() + .name("active") + .schema( + InternalSchema.builder() + .name("boolean") + .dataType(InternalType.BOOLEAN) + .isNullable(false) + .build()) + .build(); + + InternalSchema internalSchema = + InternalSchema.builder() + .name("record") + .dataType(InternalType.RECORD) + .fields(Arrays.asList(idField, nameField, activeField)) + .build(); + + // Convert to Delta Kernel StructType + StructType deltaSchema = extractor.fromInternalSchema(internalSchema); + + // Verify + assertNotNull(deltaSchema); + assertEquals(3, deltaSchema.fields().size()); + + // Check id field + StructField idDeltaField = deltaSchema.fields().get(0); + assertEquals("id", idDeltaField.getName()); + assertEquals(IntegerType.INTEGER, idDeltaField.getDataType()); + assertEquals(false, idDeltaField.isNullable()); + + // Check name field + StructField nameDeltaField = deltaSchema.fields().get(1); + assertEquals("name", nameDeltaField.getName()); + assertEquals(StringType.STRING, nameDeltaField.getDataType()); + assertEquals(true, nameDeltaField.isNullable()); + + // Check active field + StructField activeDeltaField = deltaSchema.fields().get(2); + assertEquals("active", activeDeltaField.getName()); + assertEquals(BooleanType.BOOLEAN, activeDeltaField.getDataType()); + assertEquals(false, activeDeltaField.isNullable()); + } + + @Test + public void testFromInternalSchemaWithUUID() { + // Create an InternalSchema with UUID type + InternalField uuidField = + InternalField.builder() + .name("userId") + .schema( + InternalSchema.builder() + .name("binary") + .dataType(InternalType.UUID) + .isNullable(false) + .build()) + .build(); + + InternalSchema internalSchema = + InternalSchema.builder() + .name("record") + .dataType(InternalType.RECORD) + .fields(Collections.singletonList(uuidField)) + .build(); + + // Convert to Delta Kernel StructType + StructType deltaSchema = extractor.fromInternalSchema(internalSchema); + + // Verify + assertNotNull(deltaSchema); + assertEquals(1, deltaSchema.fields().size()); + + StructField uuidDeltaField = deltaSchema.fields().get(0); + assertEquals("userId", uuidDeltaField.getName()); + assertTrue(uuidDeltaField.getDataType() instanceof BinaryType); + assertEquals(false, uuidDeltaField.isNullable()); + + // Check metadata contains UUID marker + FieldMetadata metadata = uuidDeltaField.getMetadata(); + assertTrue(metadata.contains(InternalSchema.XTABLE_LOGICAL_TYPE)); + assertEquals("uuid", metadata.getString(InternalSchema.XTABLE_LOGICAL_TYPE)); + } + + @Test + public void testFromInternalSchemaWithNestedRecords() { + // Create InternalSchema with multi-level nested structures + // Structure: root { id, person { street, city, nested { deepValue } } } + InternalSchema internalSchema = + InternalSchema.builder() + .name("record") + .dataType(InternalType.RECORD) + .fields( + Arrays.asList( + InternalField.builder() + .name("id") + .schema( + InternalSchema.builder() + .name("integer") + .dataType(InternalType.INT) + .isNullable(false) + .build()) + .build(), + InternalField.builder() + .name("person") + .schema( + InternalSchema.builder() + .name("struct") + .dataType(InternalType.RECORD) + .isNullable(true) + .fields( + Arrays.asList( + InternalField.builder() + .name("street") + .parentPath("person") + .schema( + InternalSchema.builder() + .name("string") + .dataType(InternalType.STRING) + .isNullable(false) + .build()) + .build(), + InternalField.builder() + .name("city") + .parentPath("person") + .schema( + InternalSchema.builder() + .name("string") + .dataType(InternalType.STRING) + .isNullable(true) + .build()) + .defaultValue( + InternalField.Constants.NULL_DEFAULT_VALUE) + .build(), + InternalField.builder() + .name("nested") + .parentPath("person") + .schema( + InternalSchema.builder() + .name("struct") + .dataType(InternalType.RECORD) + .isNullable(false) + .fields( + Collections.singletonList( + InternalField.builder() + .name("deepValue") + .parentPath("person.nested") + .schema( + InternalSchema.builder() + .name("string") + .dataType( + InternalType.STRING) + .isNullable(true) + .build()) + .defaultValue( + InternalField.Constants + .NULL_DEFAULT_VALUE) + .build())) + .build()) + .build())) + .build()) + .defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE) + .build())) + .build(); + + // Convert to Delta Kernel StructType + StructType deltaSchema = extractor.fromInternalSchema(internalSchema); + + // Verify structure + assertNotNull(deltaSchema); + assertEquals(2, deltaSchema.fields().size()); + + // Check id field + StructField idField = deltaSchema.fields().get(0); + assertEquals("id", idField.getName()); + assertEquals(IntegerType.INTEGER, idField.getDataType()); + assertEquals(false, idField.isNullable()); + + // Check person nested field + StructField personField = deltaSchema.fields().get(1); + assertEquals("person", personField.getName()); + assertTrue(personField.getDataType() instanceof StructType); + assertEquals(true, personField.isNullable()); + + StructType personType = (StructType) personField.getDataType(); + assertEquals(3, personType.fields().size()); + + // Check nested fields in person + StructField streetField = personType.fields().get(0); + assertEquals("street", streetField.getName()); + assertEquals(StringType.STRING, streetField.getDataType()); + assertEquals(false, streetField.isNullable()); + + StructField cityField = personType.fields().get(1); + assertEquals("city", cityField.getName()); + assertEquals(StringType.STRING, cityField.getDataType()); + assertEquals(true, cityField.isNullable()); + + // Check doubly nested field + StructField nestedField = personType.fields().get(2); + assertEquals("nested", nestedField.getName()); + assertTrue(nestedField.getDataType() instanceof StructType); + assertEquals(false, nestedField.isNullable()); + + StructType nestedType = (StructType) nestedField.getDataType(); + assertEquals(1, nestedType.fields().size()); + + StructField deepValueField = nestedType.fields().get(0); + assertEquals("deepValue", deepValueField.getName()); + assertEquals(StringType.STRING, deepValueField.getDataType()); + assertEquals(true, deepValueField.isNullable()); + } + + @Test + public void testFromInternalSchemaWithLists() { + // Create InternalSchema with lists: scores (List), items (List) + InternalSchema internalSchema = + InternalSchema.builder() + .name("record") + .dataType(InternalType.RECORD) + .fields( + Arrays.asList( + InternalField.builder() + .name("scores") + .schema( + InternalSchema.builder() + .name("array") + .isNullable(false) + .dataType(InternalType.LIST) + .fields( + Collections.singletonList( + InternalField.builder() + .name(InternalField.Constants.ARRAY_ELEMENT_FIELD_NAME) + .parentPath("scores") + .schema( + InternalSchema.builder() + .name("integer") + .dataType(InternalType.INT) + .isNullable(false) + .build()) + .build())) + .build()) + .build(), + InternalField.builder() + .name("items") + .schema( + InternalSchema.builder() + .name("array") + .isNullable(true) + .dataType(InternalType.LIST) + .fields( + Collections.singletonList( + InternalField.builder() + .name(InternalField.Constants.ARRAY_ELEMENT_FIELD_NAME) + .parentPath("items") + .schema( + InternalSchema.builder() + .name("struct") + .isNullable(true) + .dataType(InternalType.RECORD) + .fields( + Arrays.asList( + InternalField.builder() + .name("itemName") + .parentPath( + "items._one_field_element") + .schema( + InternalSchema.builder() + .name("string") + .dataType( + InternalType.STRING) + .isNullable(false) + .build()) + .build(), + InternalField.builder() + .name("itemPrice") + .parentPath( + "items._one_field_element") + .schema( + InternalSchema.builder() + .name("double") + .dataType( + InternalType.DOUBLE) + .isNullable(true) + .build()) + .defaultValue( + InternalField.Constants + .NULL_DEFAULT_VALUE) + .build())) + .build()) + .build())) + .build()) + .defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE) + .build())) + .build(); + + // Convert to Delta Kernel StructType + StructType deltaSchema = extractor.fromInternalSchema(internalSchema); + + // Verify structure + assertNotNull(deltaSchema); + assertEquals(2, deltaSchema.fields().size()); + + // Check scores (list of int) + StructField scoresField = deltaSchema.fields().get(0); + assertEquals("scores", scoresField.getName()); + assertTrue(scoresField.getDataType() instanceof ArrayType); + assertEquals(false, scoresField.isNullable()); + + ArrayType scoresArrayType = (ArrayType) scoresField.getDataType(); + assertEquals(IntegerType.INTEGER, scoresArrayType.getElementType()); + + // Check items (list of records) + StructField itemsField = deltaSchema.fields().get(1); + assertEquals("items", itemsField.getName()); + assertTrue(itemsField.getDataType() instanceof ArrayType); + assertEquals(true, itemsField.isNullable()); + + ArrayType itemsArrayType = (ArrayType) itemsField.getDataType(); + assertTrue(itemsArrayType.getElementType() instanceof StructType); + + StructType itemElementType = (StructType) itemsArrayType.getElementType(); + assertEquals(2, itemElementType.fields().size()); + + StructField itemNameField = itemElementType.fields().get(0); + assertEquals("itemName", itemNameField.getName()); + assertEquals(StringType.STRING, itemNameField.getDataType()); + assertEquals(false, itemNameField.isNullable()); + + StructField itemPriceField = itemElementType.fields().get(1); + assertEquals("itemPrice", itemPriceField.getName()); + assertEquals(DoubleType.DOUBLE, itemPriceField.getDataType()); + assertEquals(true, itemPriceField.isNullable()); + } + + @Test + public void testFromInternalSchemaWithMaps() { + // Create InternalSchema with maps: counts (Map), locations (Map) + InternalSchema internalSchema = + InternalSchema.builder() + .name("record") + .dataType(InternalType.RECORD) + .fields( + Arrays.asList( + InternalField.builder() + .name("counts") + .schema( + InternalSchema.builder() + .name("map") + .isNullable(false) + .dataType(InternalType.MAP) + .fields( + Arrays.asList( + InternalField.builder() + .name(InternalField.Constants.MAP_KEY_FIELD_NAME) + .parentPath("counts") + .schema( + InternalSchema.builder() + .name("string") + .dataType(InternalType.STRING) + .isNullable(false) + .build()) + .build(), + InternalField.builder() + .name(InternalField.Constants.MAP_VALUE_FIELD_NAME) + .parentPath("counts") + .schema( + InternalSchema.builder() + .name("integer") + .dataType(InternalType.INT) + .isNullable(false) + .build()) + .build())) + .build()) + .build(), + InternalField.builder() + .name("locations") + .schema( + InternalSchema.builder() + .name("map") + .isNullable(true) + .dataType(InternalType.MAP) + .fields( + Arrays.asList( + InternalField.builder() + .name(InternalField.Constants.MAP_KEY_FIELD_NAME) + .parentPath("locations") + .schema( + InternalSchema.builder() + .name("integer") + .dataType(InternalType.INT) + .isNullable(false) + .build()) + .build(), + InternalField.builder() + .name(InternalField.Constants.MAP_VALUE_FIELD_NAME) + .parentPath("locations") + .schema( + InternalSchema.builder() + .name("struct") + .isNullable(true) + .dataType(InternalType.RECORD) + .fields( + Arrays.asList( + InternalField.builder() + .name("latitude") + .parentPath( + "locations._one_field_value") + .schema( + InternalSchema.builder() + .name("double") + .dataType( + InternalType.DOUBLE) + .isNullable(false) + .build()) + .build(), + InternalField.builder() + .name("longitude") + .parentPath( + "locations._one_field_value") + .schema( + InternalSchema.builder() + .name("double") + .dataType( + InternalType.DOUBLE) + .isNullable(false) + .build()) + .build())) + .build()) + .build())) + .build()) + .defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE) + .build())) + .build(); + + // Convert to Delta Kernel StructType + StructType deltaSchema = extractor.fromInternalSchema(internalSchema); + + // Verify structure + assertNotNull(deltaSchema); + assertEquals(2, deltaSchema.fields().size()); + + // Check counts (map) + StructField countsField = deltaSchema.fields().get(0); + assertEquals("counts", countsField.getName()); + assertTrue(countsField.getDataType() instanceof MapType); + assertEquals(false, countsField.isNullable()); + + MapType countsMapType = (MapType) countsField.getDataType(); + assertEquals(StringType.STRING, countsMapType.getKeyType()); + assertEquals(IntegerType.INTEGER, countsMapType.getValueType()); + + // Check locations (map) + StructField locationsField = deltaSchema.fields().get(1); + assertEquals("locations", locationsField.getName()); + assertTrue(locationsField.getDataType() instanceof MapType); + assertEquals(true, locationsField.isNullable()); + + MapType locationsMapType = (MapType) locationsField.getDataType(); + assertEquals(IntegerType.INTEGER, locationsMapType.getKeyType()); + assertTrue(locationsMapType.getValueType() instanceof StructType); + + StructType locationValueType = (StructType) locationsMapType.getValueType(); + assertEquals(2, locationValueType.fields().size()); + + StructField latitudeField = locationValueType.fields().get(0); + assertEquals("latitude", latitudeField.getName()); + assertEquals(DoubleType.DOUBLE, latitudeField.getDataType()); + assertEquals(false, latitudeField.isNullable()); + + StructField longitudeField = locationValueType.fields().get(1); + assertEquals("longitude", longitudeField.getName()); + assertEquals(DoubleType.DOUBLE, longitudeField.getDataType()); + assertEquals(false, longitudeField.isNullable()); + } + + @Test + public void testRoundTripConversion() { + // Create a Delta Kernel StructType + StructType originalDeltaSchema = + new StructType( + Arrays.asList( + new StructField("id", IntegerType.INTEGER, false), + new StructField("name", StringType.STRING, true), + new StructField("score", DoubleType.DOUBLE, false))); + + // Convert to InternalSchema + InternalSchema internalSchema = extractor.toInternalSchema(originalDeltaSchema); + // Convert back to Delta Kernel StructType + StructType convertedDeltaSchema = extractor.fromInternalSchema(internalSchema); + + // Verify structure matches + assertEquals(originalDeltaSchema.fields().size(), convertedDeltaSchema.fields().size()); + + for (int i = 0; i < originalDeltaSchema.fields().size(); i++) { + StructField original = originalDeltaSchema.fields().get(i); + StructField converted = convertedDeltaSchema.fields().get(i); + + assertEquals(original.getName(), converted.getName()); + assertEquals(original.getDataType(), converted.getDataType()); + assertEquals(original.isNullable(), converted.isNullable()); + } + } + + @Test + public void testRoundTripConversionWithComplexTypes() { + // Create a complex Delta Kernel StructType with nested records, lists, and maps + StructType addressType = + new StructType() + .add("street", StringType.STRING, false) + .add("city", StringType.STRING, true); + + StructType itemType = + new StructType() + .add("name", StringType.STRING, false) + .add("price", DoubleType.DOUBLE, true); + + StructType originalDeltaSchema = + new StructType() + .add("id", IntegerType.INTEGER, false) + .add("name", StringType.STRING, true) + .add("address", addressType, true) + .add("scores", new ArrayType(IntegerType.INTEGER, false), false) + .add("items", new ArrayType(itemType, true), true) + .add("tags", new MapType(StringType.STRING, StringType.STRING, false), false) + .add("metadata", new MapType(StringType.STRING, itemType, true), true); + + // Convert to InternalSchema + InternalSchema internalSchema = extractor.toInternalSchema(originalDeltaSchema); + + // Convert back to Delta Kernel StructType + StructType convertedDeltaSchema = extractor.fromInternalSchema(internalSchema); + + // Verify structure matches + assertNotNull(convertedDeltaSchema); + assertEquals(originalDeltaSchema.fields().size(), convertedDeltaSchema.fields().size()); + + // Verify each field + for (int i = 0; i < originalDeltaSchema.fields().size(); i++) { + StructField original = originalDeltaSchema.fields().get(i); + StructField converted = convertedDeltaSchema.fields().get(i); + + assertEquals(original.getName(), converted.getName()); + assertEquals(original.isNullable(), converted.isNullable()); + + // For complex types, verify the structure + if (original.getDataType() instanceof StructType) { + assertTrue(converted.getDataType() instanceof StructType); + StructType originalStruct = (StructType) original.getDataType(); + StructType convertedStruct = (StructType) converted.getDataType(); + assertEquals(originalStruct.fields().size(), convertedStruct.fields().size()); + } else if (original.getDataType() instanceof ArrayType) { + assertTrue(converted.getDataType() instanceof ArrayType); + } else if (original.getDataType() instanceof MapType) { + assertTrue(converted.getDataType() instanceof MapType); + } else { + assertEquals(original.getDataType(), converted.getDataType()); + } + } + } } diff --git a/xtable-core/src/test/java/org/apache/xtable/kernel/TestDeltaKernelSync.java b/xtable-core/src/test/java/org/apache/xtable/kernel/TestDeltaKernelSync.java new file mode 100644 index 000000000..b67e645fd --- /dev/null +++ b/xtable-core/src/test/java/org/apache/xtable/kernel/TestDeltaKernelSync.java @@ -0,0 +1,622 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.kernel; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.time.Duration; +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; +import java.util.function.Function; +import java.util.stream.Collectors; + +import org.apache.hadoop.conf.Configuration; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import io.delta.kernel.Snapshot; +import io.delta.kernel.Table; +import io.delta.kernel.data.FilteredColumnarBatch; +import io.delta.kernel.data.Row; +import io.delta.kernel.defaults.engine.DefaultEngine; +import io.delta.kernel.engine.Engine; +import io.delta.kernel.internal.ScanImpl; +import io.delta.kernel.internal.SnapshotImpl; +import io.delta.kernel.internal.actions.AddFile; +import io.delta.kernel.utils.CloseableIterator; + +import org.apache.xtable.conversion.TargetTable; +import org.apache.xtable.exception.NotSupportedException; +import org.apache.xtable.model.InternalSnapshot; +import org.apache.xtable.model.InternalTable; +import org.apache.xtable.model.metadata.TableSyncMetadata; +import org.apache.xtable.model.schema.InternalField; +import org.apache.xtable.model.schema.InternalPartitionField; +import org.apache.xtable.model.schema.InternalSchema; +import org.apache.xtable.model.schema.InternalType; +import org.apache.xtable.model.schema.PartitionTransformType; +import org.apache.xtable.model.stat.PartitionValue; +import org.apache.xtable.model.stat.Range; +import org.apache.xtable.model.storage.DataLayoutStrategy; +import org.apache.xtable.model.storage.FileFormat; +import org.apache.xtable.model.storage.InternalDataFile; +import org.apache.xtable.model.storage.PartitionFileGroup; +import org.apache.xtable.model.storage.TableFormat; +import org.apache.xtable.spi.sync.TableFormatSync; + +/** + * Validates that Delta Kernel tables are properly created/updated using + * DeltaKernelConversionTarget. Tests partitioning, schema evolution, and metadata sync without + * Spark SQL dependencies. + */ +public class TestDeltaKernelSync { + private static final Instant LAST_COMMIT_TIME = Instant.ofEpochSecond(1000); + + @TempDir public Path tempDir; + private DeltaKernelConversionTarget conversionTarget; + private Path basePath; + private String tableName; + private Engine engine; + + @BeforeEach + public void setup() throws IOException { + tableName = "test-" + UUID.randomUUID(); + basePath = tempDir.resolve(tableName); + Files.createDirectories(basePath); + + Configuration hadoopConf = new Configuration(); + engine = DefaultEngine.create(hadoopConf); + + conversionTarget = + new DeltaKernelConversionTarget( + TargetTable.builder() + .name(tableName) + .basePath(basePath.toString()) + .metadataRetention(Duration.of(1, ChronoUnit.HOURS)) + .formatName(TableFormat.DELTA) + .build(), + engine); + } + + @Test + public void testCreateSnapshotControlFlow() throws Exception { + InternalSchema schema1 = getInternalSchema(); + List fields2 = new ArrayList<>(schema1.getFields()); + fields2.add( + InternalField.builder() + .name("float_field") + .schema( + InternalSchema.builder() + .name("float") + .dataType(InternalType.FLOAT) + .isNullable(true) + .build()) + .build()); + InternalSchema schema2 = getInternalSchema().toBuilder().fields(fields2).build(); + InternalTable table1 = getInternalTable(tableName, basePath, schema1, null, LAST_COMMIT_TIME); + InternalTable table2 = getInternalTable(tableName, basePath, schema2, null, LAST_COMMIT_TIME); + + InternalDataFile dataFile1 = getDataFile(1, Collections.emptyList(), basePath); + InternalDataFile dataFile2 = getDataFile(2, Collections.emptyList(), basePath); + InternalDataFile dataFile3 = getDataFile(3, Collections.emptyList(), basePath); + + InternalSnapshot snapshot1 = buildSnapshot(table1, "0", dataFile1, dataFile2); + InternalSnapshot snapshot2 = buildSnapshot(table2, "1", dataFile2, dataFile3); + + TableFormatSync.getInstance() + .syncSnapshot(Collections.singletonList(conversionTarget), snapshot1); + validateDeltaTable(basePath, new HashSet<>(Arrays.asList(dataFile1, dataFile2))); + + TableFormatSync.getInstance() + .syncSnapshot(Collections.singletonList(conversionTarget), snapshot2); + validateDeltaTable(basePath, new HashSet<>(Arrays.asList(dataFile2, dataFile3))); + } + + @Test + public void testFileRemovalWithCheckpoint() throws Exception { + // This test does 11 syncs to trigger checkpoint creation (happens at 10th commit) + // and verifies that file removal works correctly after checkpoint exists + String checkpointTableName = "test_table_checkpoint_" + UUID.randomUUID(); + Path checkpointTestPath = tempDir.resolve(checkpointTableName); + Files.createDirectories(checkpointTestPath); + + InternalSchema schema = getInternalSchema(); + InternalTable checkpointTable = + getInternalTable(checkpointTableName, checkpointTestPath, schema, null, LAST_COMMIT_TIME); + + DeltaKernelConversionTarget checkpointTarget = + new DeltaKernelConversionTarget( + TargetTable.builder() + .name(checkpointTableName) + .basePath(checkpointTestPath.toString()) + .metadataRetention(Duration.of(1, ChronoUnit.HOURS)) + .formatName(TableFormat.DELTA) + .build(), + engine); + + // Do 10 syncs to trigger checkpoint + for (int i = 0; i < 10; i++) { + InternalDataFile file1 = getDataFile(i * 2 + 1, Collections.emptyList(), checkpointTestPath); + InternalDataFile file2 = getDataFile(i * 2 + 2, Collections.emptyList(), checkpointTestPath); + + InternalSnapshot snapshot = buildSnapshot(checkpointTable, String.valueOf(i), file1, file2); + TableFormatSync.getInstance() + .syncSnapshot(Collections.singletonList(checkpointTarget), snapshot); + } + + // 11th sync: This triggers checkpoint creation at version 10 + InternalDataFile file21 = getDataFile(21, Collections.emptyList(), checkpointTestPath); + InternalDataFile file22 = getDataFile(22, Collections.emptyList(), checkpointTestPath); + InternalSnapshot snapshot11 = buildSnapshot(checkpointTable, "10", file21, file22); + + TableFormatSync.getInstance() + .syncSnapshot(Collections.singletonList(checkpointTarget), snapshot11); + + // Checkpoint is created synchronously via post-commit hooks + Path checkpointFile = + checkpointTestPath.resolve("_delta_log/00000000000000000010.checkpoint.parquet"); + assertTrue(Files.exists(checkpointFile), "Checkpoint file should exist after 10 commits"); + + // 12th sync: NOW checkpoint exists and can be used to detect file removals + InternalDataFile file23 = getDataFile(23, Collections.emptyList(), checkpointTestPath); + InternalDataFile file24 = getDataFile(24, Collections.emptyList(), checkpointTestPath); + InternalSnapshot snapshot12 = buildSnapshot(checkpointTable, "11", file23, file24); + + TableFormatSync.getInstance() + .syncSnapshot(Collections.singletonList(checkpointTarget), snapshot12); + + // Validate: Should only have file23 and file24 (file21/file22 should be removed) + validateDeltaTable(checkpointTestPath, new HashSet<>(Arrays.asList(file23, file24))); + } + + @Test + public void testPrimitiveFieldPartitioning() throws Exception { + InternalSchema schema = getInternalSchema(); + InternalPartitionField internalPartitionField = + InternalPartitionField.builder() + .sourceField( + InternalField.builder() + .name("string_field") + .schema( + InternalSchema.builder() + .name("string") + .dataType(InternalType.STRING) + .build()) + .build()) + .transformType(PartitionTransformType.VALUE) + .build(); + InternalTable table = + getInternalTable( + tableName, + basePath, + schema, + Collections.singletonList(internalPartitionField), + LAST_COMMIT_TIME); + + List partitionValues1 = + Collections.singletonList( + PartitionValue.builder() + .partitionField(internalPartitionField) + .range(Range.scalar("level")) + .build()); + List partitionValues2 = + Collections.singletonList( + PartitionValue.builder() + .partitionField(internalPartitionField) + .range(Range.scalar("warning")) + .build()); + InternalDataFile dataFile1 = getDataFile(1, partitionValues1, basePath); + InternalDataFile dataFile2 = getDataFile(2, partitionValues1, basePath); + InternalDataFile dataFile3 = getDataFile(3, partitionValues2, basePath); + + InternalSnapshot snapshot1 = buildSnapshot(table, "0", dataFile1, dataFile2, dataFile3); + + TableFormatSync.getInstance() + .syncSnapshot(Collections.singletonList(conversionTarget), snapshot1); + + // Validate all files are present + validateDeltaTable(basePath, new HashSet<>(Arrays.asList(dataFile1, dataFile2, dataFile3))); + + // Verify partition columns are set + Table deltaTable = Table.forPath(engine, basePath.toString()); + Snapshot snapshot = deltaTable.getLatestSnapshot(engine); + SnapshotImpl snapshotImpl = (SnapshotImpl) snapshot; + Set partitionColumns = snapshotImpl.getMetadata().getPartitionColNames(); + assertEquals(1, partitionColumns.size()); + assertTrue(partitionColumns.contains("string_field")); + } + + @Test + public void testMultipleFieldPartitioning() throws Exception { + InternalSchema schema = getInternalSchema(); + InternalPartitionField internalPartitionField1 = + InternalPartitionField.builder() + .sourceField( + InternalField.builder() + .name("string_field") + .schema( + InternalSchema.builder() + .name("string") + .dataType(InternalType.STRING) + .build()) + .build()) + .transformType(PartitionTransformType.VALUE) + .build(); + InternalPartitionField internalPartitionField2 = + InternalPartitionField.builder() + .sourceField( + InternalField.builder() + .name("int_field") + .schema(InternalSchema.builder().name("int").dataType(InternalType.INT).build()) + .build()) + .transformType(PartitionTransformType.VALUE) + .build(); + InternalTable table = + getInternalTable( + tableName, + basePath, + schema, + Arrays.asList(internalPartitionField1, internalPartitionField2), + LAST_COMMIT_TIME); + + List partitionValues1 = + Arrays.asList( + PartitionValue.builder() + .partitionField(internalPartitionField1) + .range(Range.scalar("level")) + .build(), + PartitionValue.builder() + .partitionField(internalPartitionField2) + .range(Range.scalar(10)) + .build()); + List partitionValues2 = + Arrays.asList( + PartitionValue.builder() + .partitionField(internalPartitionField1) + .range(Range.scalar("level")) + .build(), + PartitionValue.builder() + .partitionField(internalPartitionField2) + .range(Range.scalar(20)) + .build()); + List partitionValues3 = + Arrays.asList( + PartitionValue.builder() + .partitionField(internalPartitionField1) + .range(Range.scalar("warning")) + .build(), + PartitionValue.builder() + .partitionField(internalPartitionField2) + .range(Range.scalar(20)) + .build()); + + InternalDataFile dataFile1 = getDataFile(1, partitionValues1, basePath); + InternalDataFile dataFile2 = getDataFile(2, partitionValues2, basePath); + InternalDataFile dataFile3 = getDataFile(3, partitionValues3, basePath); + + InternalSnapshot snapshot1 = buildSnapshot(table, "0", dataFile1, dataFile2, dataFile3); + TableFormatSync.getInstance() + .syncSnapshot(Collections.singletonList(conversionTarget), snapshot1); + validateDeltaTable(basePath, new HashSet<>(Arrays.asList(dataFile1, dataFile2, dataFile3))); + + // Verify partition columns + Table deltaTable = Table.forPath(engine, basePath.toString()); + Snapshot snapshot = deltaTable.getLatestSnapshot(engine); + SnapshotImpl snapshotImpl = (SnapshotImpl) snapshot; + Set partitionColumns = snapshotImpl.getMetadata().getPartitionColNames(); + assertEquals(2, partitionColumns.size()); + assertTrue(partitionColumns.contains("string_field")); + assertTrue(partitionColumns.contains("int_field")); + } + + @Test + @Disabled( + "Disabled due to tags not present in commitinfo - https://github.com/delta-io/delta/issues/6167") + public void testSourceTargetIdMapping() throws Exception { + InternalSchema baseSchema = getInternalSchema(); + InternalTable sourceTable = + getInternalTable("source_table", basePath, baseSchema, null, LAST_COMMIT_TIME); + + InternalDataFile sourceDataFile1 = getDataFile(101, Collections.emptyList(), basePath); + InternalDataFile sourceDataFile2 = getDataFile(102, Collections.emptyList(), basePath); + InternalDataFile sourceDataFile3 = getDataFile(103, Collections.emptyList(), basePath); + + InternalSnapshot sourceSnapshot1 = + buildSnapshot(sourceTable, "0", sourceDataFile1, sourceDataFile2); + InternalSnapshot sourceSnapshot2 = + buildSnapshot(sourceTable, "1", sourceDataFile2, sourceDataFile3); + + TableFormatSync.getInstance() + .syncSnapshot(Collections.singletonList(conversionTarget), sourceSnapshot1); + Optional mappedTargetId1 = + conversionTarget.getTargetCommitIdentifier(sourceSnapshot1.getSourceIdentifier()); + validateDeltaTable(basePath, new HashSet<>(Arrays.asList(sourceDataFile1, sourceDataFile2))); + assertTrue(mappedTargetId1.isPresent()); + assertEquals("0", mappedTargetId1.get()); + + TableFormatSync.getInstance() + .syncSnapshot(Collections.singletonList(conversionTarget), sourceSnapshot2); + Optional mappedTargetId2 = + conversionTarget.getTargetCommitIdentifier(sourceSnapshot2.getSourceIdentifier()); + validateDeltaTable(basePath, new HashSet<>(Arrays.asList(sourceDataFile2, sourceDataFile3))); + assertTrue(mappedTargetId2.isPresent()); + assertEquals("1", mappedTargetId2.get()); + + Optional unmappedTargetId = conversionTarget.getTargetCommitIdentifier("s3"); + assertFalse(unmappedTargetId.isPresent()); + } + + @Test + public void testGetTargetCommitIdentifierWithNullSourceIdentifier() throws Exception { + InternalSchema baseSchema = getInternalSchema(); + InternalTable internalTable = + getInternalTable("source_table", basePath, baseSchema, null, LAST_COMMIT_TIME); + InternalDataFile sourceDataFile = getDataFile(101, Collections.emptyList(), basePath); + InternalSnapshot snapshot = buildSnapshot(internalTable, "0", sourceDataFile); + + // Mock the snapshot sync process + conversionTarget.beginSync(internalTable); + TableSyncMetadata tableSyncMetadata = + TableSyncMetadata.of( + internalTable.getLatestCommitTime(), new ArrayList<>(snapshot.getPendingCommits())); + conversionTarget.syncMetadata(tableSyncMetadata); + conversionTarget.syncSchema(internalTable.getReadSchema()); + conversionTarget.syncPartitionSpec(internalTable.getPartitioningFields()); + conversionTarget.syncFilesForSnapshot(snapshot.getPartitionedDataFiles()); + conversionTarget.completeSync(); + + // getTargetCommitIdentifier is not supported in DeltaKernelConversionTarget + // because Delta Kernel 4.0.0 does not support commit tags + NotSupportedException exception = + assertThrows( + NotSupportedException.class, () -> conversionTarget.getTargetCommitIdentifier("0")); + assertTrue( + exception + .getMessage() + .contains("Source-to-target commit identifier mapping is not supported")); + } + + @Test + public void testGetTableMetadata() throws Exception { + InternalSchema schema = getInternalSchema(); + InternalTable table = getInternalTable(tableName, basePath, schema, null, LAST_COMMIT_TIME); + InternalDataFile dataFile = getDataFile(1, Collections.emptyList(), basePath); + InternalSnapshot snapshot = buildSnapshot(table, "0", dataFile); + + TableFormatSync.getInstance() + .syncSnapshot(Collections.singletonList(conversionTarget), snapshot); + + Optional metadata = conversionTarget.getTableMetadata(); + assertTrue(metadata.isPresent(), "Metadata should be present after sync"); + TableSyncMetadata syncMetadata = metadata.get(); + assertNotNull(syncMetadata.getLastInstantSynced(), "Last instant synced should not be null"); + } + + private void validateDeltaTable(Path basePath, Set expectedFiles) + throws IOException { + Table table = Table.forPath(engine, basePath.toString()); + assertNotNull(table); + + Snapshot snapshot = table.getLatestSnapshot(engine); + assertNotNull(snapshot); + + // Scan all files + ScanImpl scan = (ScanImpl) snapshot.getScanBuilder().build(); + + Map pathToFile = + expectedFiles.stream() + .collect(Collectors.toMap(InternalDataFile::getPhysicalPath, Function.identity())); + + int count = 0; + try (CloseableIterator scanFiles = scan.getScanFiles(engine, false)) { + while (scanFiles.hasNext()) { + FilteredColumnarBatch batch = scanFiles.next(); + try (CloseableIterator rows = batch.getRows()) { + while (rows.hasNext()) { + Row scanFileRow = rows.next(); + AddFile addFile = + new AddFile(scanFileRow.getStruct(scanFileRow.getSchema().indexOf("add"))); + + String fullPath = + new org.apache.hadoop.fs.Path(basePath.resolve(addFile.getPath()).toUri()) + .toString(); + InternalDataFile expected = pathToFile.get(fullPath); + assertNotNull(expected, "Unexpected file in Delta table: " + fullPath); + assertEquals(addFile.getSize(), expected.getFileSizeBytes()); + count++; + } + } + } + } + + assertEquals( + expectedFiles.size(), count, "Number of files from Delta scan don't match expectation"); + } + + private InternalSnapshot buildSnapshot( + InternalTable table, String sourceIdentifier, InternalDataFile... dataFiles) { + return InternalSnapshot.builder() + .table(table) + .partitionedDataFiles(PartitionFileGroup.fromFiles(Arrays.asList(dataFiles))) + .sourceIdentifier(sourceIdentifier) + .build(); + } + + private InternalTable getInternalTable( + String tableName, + Path basePath, + InternalSchema schema, + List partitionFields, + Instant lastCommitTime) { + return InternalTable.builder() + .name(tableName) + .basePath(basePath.toUri().toString()) + .layoutStrategy(DataLayoutStrategy.FLAT) + .tableFormat(TableFormat.HUDI) + .readSchema(schema) + .partitioningFields(partitionFields) + .latestCommitTime(lastCommitTime) + .build(); + } + + private InternalDataFile getDataFile( + int index, List partitionValues, Path basePath) { + // Create actual physical file so Delta Kernel can reference it + try { + Path filePath = basePath.resolve("physical" + index + ".parquet"); + Files.createFile(filePath); + + String physicalPath = new org.apache.hadoop.fs.Path(filePath.toUri()).toString(); + + return InternalDataFile.builder() + .fileFormat(FileFormat.APACHE_PARQUET) + .fileSizeBytes(1000L + (index * 100L)) // Deterministic size based on index + .physicalPath(physicalPath) + .recordCount(100L + (index * 10L)) // Deterministic record count based on index + .partitionValues(partitionValues) + .columnStats(Collections.emptyList()) + .lastModified(1000000000L + (index * 1000L)) // Deterministic timestamp based on index + .build(); + } catch (IOException e) { + throw new RuntimeException("Failed to create test data file", e); + } + } + + private InternalSchema getInternalSchema() { + Map timestampMetadata = new HashMap<>(); + timestampMetadata.put( + InternalSchema.MetadataKey.TIMESTAMP_PRECISION, InternalSchema.MetadataValue.MILLIS); + return InternalSchema.builder() + .dataType(InternalType.RECORD) + .name("top_level_schema") + .fields( + Arrays.asList( + InternalField.builder() + .name("long_field") + .schema( + InternalSchema.builder() + .name("long") + .dataType(InternalType.LONG) + .isNullable(true) + .build()) + .build(), + InternalField.builder() + .name("string_field") + .schema( + InternalSchema.builder() + .name("string") + .dataType(InternalType.STRING) + .isNullable(true) + .build()) + .build(), + InternalField.builder() + .name("int_field") + .schema( + InternalSchema.builder() + .name("int") + .dataType(InternalType.INT) + .isNullable(true) + .build()) + .build(), + InternalField.builder() + .name("timestamp_field") + .schema( + InternalSchema.builder() + .name("time") + .dataType(InternalType.TIMESTAMP) + .isNullable(true) + .metadata(timestampMetadata) + .build()) + .build())) + .isNullable(false) + .build(); + } + + @Test + public void testTimestampNtz() throws Exception { + InternalSchema schema1 = getInternalSchemaWithTimestampNtz(); + List fields2 = new ArrayList<>(schema1.getFields()); + fields2.add( + InternalField.builder() + .name("float_field") + .schema( + InternalSchema.builder() + .name("float") + .dataType(InternalType.FLOAT) + .isNullable(true) + .build()) + .build()); + InternalSchema schema2 = getInternalSchema().toBuilder().fields(fields2).build(); + InternalTable table1 = getInternalTable(tableName, basePath, schema1, null, LAST_COMMIT_TIME); + InternalTable table2 = getInternalTable(tableName, basePath, schema2, null, LAST_COMMIT_TIME); + + InternalDataFile dataFile1 = getDataFile(1, Collections.emptyList(), basePath); + InternalDataFile dataFile2 = getDataFile(2, Collections.emptyList(), basePath); + InternalDataFile dataFile3 = getDataFile(3, Collections.emptyList(), basePath); + + InternalSnapshot snapshot1 = buildSnapshot(table1, "0", dataFile1, dataFile2); + InternalSnapshot snapshot2 = buildSnapshot(table2, "1", dataFile2, dataFile3); + + TableFormatSync.getInstance() + .syncSnapshot(Collections.singletonList(conversionTarget), snapshot1); + validateDeltaTable(basePath, new HashSet<>(Arrays.asList(dataFile1, dataFile2))); + + TableFormatSync.getInstance() + .syncSnapshot(Collections.singletonList(conversionTarget), snapshot2); + validateDeltaTable(basePath, new HashSet<>(Arrays.asList(dataFile2, dataFile3))); + } + + private InternalSchema getInternalSchemaWithTimestampNtz() { + Map timestampMetadata = new HashMap<>(); + timestampMetadata.put( + InternalSchema.MetadataKey.TIMESTAMP_PRECISION, InternalSchema.MetadataValue.MICROS); + List fields = new ArrayList<>(getInternalSchema().getFields()); + fields.add( + InternalField.builder() + .name("timestamp_ntz_field") + .schema( + InternalSchema.builder() + .name("time_ntz") + .dataType(InternalType.TIMESTAMP_NTZ) + .isNullable(true) + .metadata(timestampMetadata) + .build()) + .build()); + return getInternalSchema().toBuilder().fields(fields).build(); + } +}