Conversation
… into the parquet table
…ds, interfacing with ConversionSource
|
I can do first review for this @the-other-tim-brown @vinishjail97 |
| try (ParquetWriter<Group> writer = | ||
| new ParquetWriter<Group>( | ||
| outputFile, | ||
| new GroupWriteSupport(), | ||
| parquetFileConfig.getCodec(), | ||
| (int) parquetFileConfig.getRowGroupSize(), | ||
| pageSize, | ||
| pageSize, // dictionaryPageSize | ||
| true, // enableDictionary | ||
| false, // enableValidation | ||
| ParquetWriter.DEFAULT_WRITER_VERSION, | ||
| conf)) { | ||
| Group currentGroup = null; | ||
| while ((currentGroup = (Group) reader.read()) != null) { | ||
| writer.write(currentGroup); |
There was a problem hiding this comment.
Why are we writing new parquet files again like this through the writer? I think there's some misunderstanding with the parquet incremental sync feature here.
Parquet Incremental Sync Requirements.
- You have a target table where parquet files [p1/f1.parquet, p1/f2.parquet, p2/f1.parquet] have been synced to hudi, iceberg and delta for example.
- In the source changes some changes have been made a new file in partition p1 was added and p2's file was deleted. The incremental sync should now sync the new changes incrementally.
@sapienza88 It's better to align on the approach first here before we push PR's. Can you add the approach for parquet incremental sync in the PR description or any google doc if possible?
There was a problem hiding this comment.
@sapienza88 XTable shouldn't be writing any new data or parquet files it operates at a metadata level. Can you see this comment for reference?
#550 (comment)
Fetch the parquet files that have been added since last syncInstant to retrieve the change log. We can this via the same list call and filtering files based on their creationTime is the simplest way but it's expensive
There was a problem hiding this comment.
@vinishjail97 thanks for the suggestion, but that isn't helping. Could you elaborate on that idea and how you could manage the metadata only for the task of retrieving data from a particular (modification) date? at the very least the current ConversionSource wasn't coded with that in mind.
|
@vinishjail97 I added some comments on the functions so that the approach is clearer. All above suggestions were also taken into account in my last commit. |
|
XTable shouldn't be writing any new data or parquet files it operators at a metadata level. Can you see this comment for reference? I had written few approaches on how to do incremental parquet sync. |
|
@sapienza88 I'm adding a more detailed design and a class level structure to unblock this PR. Design Principle
Architecture Use file modification time as commit identifier, you will be able to identify which files have been synced and which haven't been synced. The files not synced need to have metadata generated. The future functionality like making it optimized, handling deleted parquet files in storage can be handled incrementally, hoping to scope low for this PR. |
…ds using the FileStatus' modifTime attribute
…ificationTime selector
…ppend and 2) filter for sync
c94066a to
2020a84
Compare
| } | ||
| } | ||
|
|
||
| @Test |
There was a problem hiding this comment.
Line 184 needs to be updated to include INCREMENTAL as well
| * parquet files and filtering the files based on the modification times. | ||
| */ | ||
| @Log4j2 | ||
| @RequiredArgsConstructor |
There was a problem hiding this comment.
Nit — exposing both the Lombok-generated 3-arg ctor (@RequiredArgsConstructor) and an explicit 2-arg ctor creates ambiguity about the public API. Production code calls the 2-arg form; tests call the 3-arg form. Consider dropping @RequiredArgsConstructor and either annotating the 3-arg ctor @VisibleForTesting or using a package-private static factory method for tests.
There was a problem hiding this comment.
I'll leave the rest of your comments here to somebody else to do them.
| RowFactory.create(103, "BA", 2027, 11)); | ||
|
|
||
| Dataset<Row> dfInit = sparkSession.createDataFrame(data, schema); | ||
| Path fixedPath = Paths.get("target", "fixed-parquet-data", "parquet_table_test_2"); |
There was a problem hiding this comment.
Relative path Paths.get("target", "fixed-parquet-data", "parquet_table_test_2") pollutes the workspace across test runs, isn't cleaned up, and makes the test order-dependent when re-run without ./gradlew clean. This class already uses the @TempDir pattern — please use it here too. Also drop the commented-out // String outputPath = fixedPath.toString(); on line 457.
| assertNotNull(result); | ||
| List<ParquetFileInfo> fileList = result.collect(Collectors.toList()); | ||
| assertEquals(3, fileList.size()); | ||
| assertEquals(1000L, fileList.get(0).getModificationTime()); |
There was a problem hiding this comment.
Nit — asserting positional ordering here relies on RemoteIterator + Collectors.toList() preserving the mock insertion order. In production, FS listing order is platform-dependent and not guaranteed. Either sort inside getCurrentFilesInfo() (and document the ordering contract) or switch these assertions to Set<Long> comparisons. Same pattern applies to testGetParquetFilesMetadataAfterTime_someMatch and _exactTimeMatch.
… ParquetDataManager and adjusted operator for mostRecentFile + spotless
… ParquetDataManager and adjusted operator for mostRecentFile + spotless + import error fixed
What is the purpose of the pull request
Adds incremental syncing ability to the ParquetSource
Brief change log
Verify this pull request