Conversation
|
@vaibhavk1992 can you update the PR description? |
the-other-tim-brown
left a comment
There was a problem hiding this comment.
@vaibhavk1992 can you complete a self-review of the code first to clean up a bit?
Merged 7 upstream commits: - f991e31 Parquet Source: snapshot sync fixes (apache#806) - 4307565 Parquet source: column stats support (apache#805) - 5c25674 Remove wildcard imports and enforce with spotless (apache#809) - fe7215e add .sdkmanrc to .gitignore (apache#793) - abbf4b7 fix(iceberg): nested comments (apache#797) - 8e58367 Remove redundant getSnapshotAt calls (apache#791) - 8cab6a2 fix(delta): avoid NPE for binary in map/array (apache#795) Resolved conflicts: - TestDeltaKernelSchemaExtractor.java: kept StructField import needed for new tests
Fixed wildcard imports in Delta Kernel test files to comply with spotless rules enforced in upstream commit 5c25674.
The spotless:apply command removed wildcard imports but didn't add back all necessary specific imports. Added missing imports: TestDeltaKernelReadWriteIntegration.java: - Static assertions (assertEquals, assertTrue, assertFalse, assertNotNull) - java.util.* (Random, UUID, List, Map, Set, Arrays, Collections, etc.) TestDeltaKernelSync.java: - Static assertions (including fail) - java.util.* (Random, UUID, List, Map, Set, Arrays, Collections, etc.) TestDeltaKernelDataFileUpdatesExtractor.java: - Static assertions (assertEquals, assertTrue, assertFalse, assertNotNull) - java.util.* (List, Arrays, Collections) All tests now compile successfully.
|
I have tried to address all the comments @vinishjail97 @the-other-tim-brown |
| } | ||
| } catch (Exception e) { | ||
| // Log and continue to next commit | ||
| log.warn( |
There was a problem hiding this comment.
log.warn("...", version, e.getMessage()) swallows the stack trace. Pass the exception as the last argument instead: log.warn("Failed to parse commit metadata for version {}", version, e). On-call engineers debugging a production issue won't know where the parse failure originated.
| try { | ||
| Table table = Table.forPath(engine, basePath); | ||
| this.latestSchema = table.getLatestSnapshot(engine).getSchema(); | ||
| } catch (Exception e) { |
There was a problem hiding this comment.
catch (Exception e) silently sets this.latestSchema = null. If the error is a network issue, permissions error, or anything other than "table doesn't exist", this will silently proceed with a null schema and cause an NPE in addColumn() at line 366 (latestSchema.add(field)). Should catch only the specific "table not found" exception from Delta Kernel, not the broad Exception.
…Kernel integration
|
@vinishjail97 I have implemented all the above suggestions. |
| for (RowBackedAction action : actions) { | ||
|
|
||
| if (action instanceof io.delta.kernel.internal.actions.AddFile) { | ||
| io.delta.kernel.internal.actions.AddFile addFile = |
There was a problem hiding this comment.
Can these classes from the delta kernel be imported?
| DeltaKernelDataFileUpdatesExtractor.builder() | ||
| .engine(engine) | ||
| .basePath(targetTable.getBasePath()) | ||
| // Column statistics are not needed for conversion operations |
There was a problem hiding this comment.
When we convert from one format to another, we actually do want the statistics. This allows query engines to leverage them for their planning operations for improved efficiency.
| if (action instanceof AddFile) { | ||
| AddFile addFile = (AddFile) action; | ||
| Row wrappedRow = | ||
| io.delta.kernel.internal.actions.SingleAction.createAddFileSingleAction( |
There was a problem hiding this comment.
Let's start using imports throughout the PR please. Do a sanity check of the files and make sure you are using them throughout. Highlighting every line leads to a lot of noise on the reviews.
| public static boolean tableExists(Engine engine, String basePath) { | ||
| try { | ||
| Table table = Table.forPath(engine, basePath); | ||
| table.getLatestSnapshot(engine); |
There was a problem hiding this comment.
This looks like it will load the snapshot, is there a more lightweight way to do this?
| DeltaKernelSchemaExtractor.getInstance().toInternalSchema(structRepresentation)); | ||
| } | ||
|
|
||
| // ========== Tests for fromInternalSchema() - New Tests ========== |
There was a problem hiding this comment.
Remove this comment for New Tests?
|
|
||
| // ========== Tests for fromInternalSchema() - New Tests ========== | ||
|
|
||
| @Test |
There was a problem hiding this comment.
Let's have tests with nested fields, lists and maps as well
|
|
||
| // Verify we have AddFile actions | ||
| boolean hasAddFile = actionList.stream().anyMatch(action -> action instanceof AddFile); | ||
| assertTrue(hasAddFile, "Should contain AddFile actions"); |
There was a problem hiding this comment.
Can we assert on the content of the AddFile to make sure it is aligned with our expectations?
| actionList.stream().filter(action -> action instanceof RemoveFile).count(); | ||
|
|
||
| // Verify: Should have AddFile for file3 (new file) | ||
| assertTrue(addFileCount >= 1, "Should have at least 1 AddFile action for new file (file3)"); |
There was a problem hiding this comment.
These counts should be strict. Only 1 file is expected to be added and 1 removed
… type tests - Enhanced TestDeltaKernelDataFileUpdatesExtractor with detailed AddFile content assertions - Added strict count verification (== instead of >=) for differential sync tests - Fixed path format inconsistency (Hadoop URI vs plain string) in test files - Added 3 comprehensive tests for fromInternalSchema: nested records, lists, and maps - Simplified test code by inlining nested schema builds with clear structural comments - Fixed applyDiff signature and improved DeltaKernelDataFileUpdatesExtractor - Added DeltaKernelUtils.tableExists helper method - All 19 tests passing (16 schema + 3 data file updater tests) Test coverage now includes: - Multi-level nested structures (3 levels deep) - Lists of primitives and complex types - Maps of primitives and complex types - Round-trip conversions with complex types - Strict assertions on AddFile/RemoveFile actions Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
| if (!tableExists) { | ||
| File tableDir = new File(basePath); | ||
| if (!tableDir.exists()) { | ||
| tableDir.mkdirs(); |
There was a problem hiding this comment.
new File(basePath).mkdirs() only works for local filesystem paths. For S3/GCS/Azure/HDFS paths (e.g., s3://bucket/path), this silently fails or creates nonsensical local directories.
Consider removing this block — Table.forPath() with CREATE_TABLE operation should handle table directory creation. Or use Hadoop FileSystem.mkdirs() if explicit creation is needed.
| RemoveFile removeFile = | ||
| new RemoveFile(addFile.toRemoveFileRow(false, Optional.of(snapshot.getVersion()))); | ||
| String fullPath = | ||
| DeltaKernelActionsConverter.getFullPathToFile(removeFile.getPath(), table); |
There was a problem hiding this comment.
Performance concern: DeltaKernelActionsConverter.getFullPathToFile() creates a new Configuration + DefaultEngine per call. In applySnapshot, this is invoked per-file in the existing snapshot — potentially thousands of times. Each new Configuration() scans the classpath.
Consider accepting an Engine parameter, or computing the full path directly from basePath + removeFile.getPath() without constructing a new engine each time.
|
|
||
| try { | ||
| Table table = Table.forPath(engine, basePath); | ||
| this.latestSchema = table.getLatestSnapshot(engine).getSchema(); |
There was a problem hiding this comment.
Multiple full snapshot loads per sync cycle: the constructor loads the snapshot here for schema, commitTransaction() calls checkTableExists() which loads another snapshot (line 470 -> DeltaKernelUtils.tableExists), and syncFilesForSnapshot/syncFilesForDiff load yet another (lines 268, 280). That's 3-4 full log replays per sync.
Consider caching the snapshot (or at least the tableExists result) within a TransactionState lifecycle.
| return DeltaKernelUtils.tableExists(engine, basePath); | ||
| } | ||
|
|
||
| private Map<String, String> getConfigurationsForDeltaSync() { |
There was a problem hiding this comment.
getConfigurationsForDeltaSync() does not set minReaderVersion/minWriterVersion, unlike the existing DeltaConversionTarget. Protocol versions default to Delta Kernel defaults, which may not match features used (e.g., generated columns require writer version 4). Is this intentional?
|
|
||
| private final Engine engine; | ||
| private final String basePath; | ||
| private final boolean includeColumnStats; |
There was a problem hiding this comment.
includeColumnStats field is declared but never read — createAddFileAction always passes Optional.empty() for stats (line 177). Either implement stats conversion or remove the dead field. If deferred, add a // TODO(issue-link) so it doesn't get forgotten.
| Optional.empty(), // tags | ||
| Optional.empty(), // baseRowId | ||
| Optional.empty(), // defaultRowCommitVersion | ||
| Optional.empty() // stats - TODO: convert column stats to DataFileStatistics |
There was a problem hiding this comment.
Stats are always Optional.empty(). The synced Delta table will have no column statistics on AddFile entries, degrading data skipping for downstream readers (Spark, Trino). Is there a tracking issue for adding stats support?
| @@ -265,22 +265,27 @@ private void collectUnsupportedStats(Map<String, Object> additionalStats) { | |||
| */ | |||
There was a problem hiding this comment.
This class is ~300 lines of near-identical duplication from DeltaStatsExtractor in the Standalone package. The only material difference is the AddFile import. Consider extracting shared stats serialization logic (convertStatsToDeltaFormat, insertValueAtPath, flattenStatMap, DeltaStats) into a common utility to avoid maintaining two copies.
|
|
||
| // Scan all files | ||
| ScanImpl scan = (ScanImpl) snapshot.getScanBuilder().build(); | ||
| CloseableIterator<FilteredColumnarBatch> scanFiles = scan.getScanFiles(engine, false); |
There was a problem hiding this comment.
validateDeltaTable uses CloseableIterator<FilteredColumnarBatch> (line 440) and CloseableIterator<Row> (line 449) that are never closed. Use try-with-resources to prevent resource leaks:
try (CloseableIterator<FilteredColumnarBatch> scanFiles = scan.getScanFiles(engine, false)) {
// ...
}| assertTrue( | ||
| addFile.getPath().contains("test_data.parquet"), | ||
| "AddFile path should contain the test file name"); | ||
| assertTrue(addFile.getSize() > 0, "AddFile size should be greater than 0"); |
There was a problem hiding this comment.
nit: This asserts addFile.getSize() > 0 but the test creates expectedFileSize = 1024L (line 128). Use assertEquals(1024L, addFile.getSize()) to validate size propagation — otherwise a wrong size still passes.
| // Verify schema | ||
| InternalSchema readSchema = readTable.getReadSchema(); | ||
| assertNotNull(readSchema); | ||
| assertEquals(schema.getFields().size(), readSchema.getFields().size()); |
There was a problem hiding this comment.
nit: This only checks field count, not content. Use assertEquals(schema, readSchema) for full structural comparison — otherwise schema corruption that preserves field count goes undetected.
What is the purpose of the pull request
This PR migrates
XTable's Delta Lake integration from Delta Standalone to
Delta Kernel for writers
Brief change log
Verify this pull request
(Please pick either of the following options)
This pull request is a trivial rework / code cleanup without any test coverage.
(or)
This pull request is already covered by existing tests, such as (please describe tests).
(or)
This change added tests and can be verified as follows:
(example:)