-
Notifications
You must be signed in to change notification settings - Fork 200
Parquet Incremental Sync #768
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
e541a71
15e282a
2ee71c9
6032e5f
f6fdc72
a94c3f3
f8bdbfe
c04a983
5f2541e
47e7076
fbb09ec
fe19a60
da7f300
a8730b7
d19ccbf
aecb204
0ec8cbb
9cb75df
9e125f2
233ca77
b4cba5a
a564b29
d1ceafb
649250f
24ff828
463d2ee
71d1c34
3319a91
013ffe4
b939a0a
b6d8ddc
c18ab1c
b7c613e
2a75f49
f1538b0
cdedeae
e873120
4d0f245
219656e
ff809d7
e06368f
7cdccd0
b919146
8ff5aa6
db9d7ab
324d703
3c453e6
e4c0b4c
4315282
1417929
7620c01
e458d72
9947f1b
cd66151
4e4c5cb
212e50a
5ea0b5a
64a5a2d
2135ccf
e76ac21
9901c1b
a2bb6c3
2d23f31
ceb924c
5b0a9fd
80e1927
0aedd70
5cbeeeb
13e5a65
fa77646
e53fa4b
a6b75d5
bdc9f40
43d986d
5f67a0f
dae07b7
9e0de0a
8c1b8f6
84bae49
6ce28a3
cf2564c
6cb87ee
5c25093
17a3134
5b0d8dc
8db84a0
b8e0d16
fdaeb4d
b2f214d
913ac29
9e243a1
a7412c3
c028112
089e869
f1f8be1
6588d1d
da9e401
15567ab
119b68c
ca49c63
9711135
a62a24c
9c598ab
2020a84
f4f9e9e
4fb91cf
f0ebcf6
4d636e0
6e3b5aa
5364315
9e27f44
9c0ca1d
51e8dd9
f46305d
5733dbe
d69d944
2e826e1
7975980
292bfbe
d7076f6
daae5f2
df448cf
ac3f534
47d0d82
1d70041
1a18297
a8f713d
350b74f
75fcdde
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,107 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.xtable.parquet; | ||
|
|
||
| import java.io.IOException; | ||
| import java.net.URI; | ||
| import java.util.Comparator; | ||
| import java.util.List; | ||
| import java.util.stream.Collectors; | ||
| import java.util.stream.Stream; | ||
|
|
||
| import lombok.AccessLevel; | ||
| import lombok.Getter; | ||
| import lombok.RequiredArgsConstructor; | ||
| import lombok.extern.log4j.Log4j2; | ||
|
|
||
| import org.apache.hadoop.conf.Configuration; | ||
| import org.apache.hadoop.fs.FileSystem; | ||
| import org.apache.hadoop.fs.LocatedFileStatus; | ||
| import org.apache.hadoop.fs.Path; | ||
| import org.apache.hadoop.fs.RemoteIterator; | ||
| import org.apache.hadoop.util.functional.RemoteIterators; | ||
|
|
||
| import org.apache.xtable.exception.ReadException; | ||
|
|
||
| /** | ||
| * Manages Parquet File's Metadata | ||
| * | ||
| * <p>This class provides functions to handle Parquet metadata, creating metadata objects from | ||
| * parquet files and filtering the files based on the modification times. | ||
| */ | ||
| @Log4j2 | ||
| @RequiredArgsConstructor | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit — exposing both the Lombok-generated 3-arg ctor (
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'll leave the rest of your comments here to somebody else to do them. |
||
| public class ParquetDataManager { | ||
|
sapienza88 marked this conversation as resolved.
|
||
| private final Configuration hadoopConf; | ||
| private final String basePath; | ||
| private final FileSystem fileSystem; | ||
|
|
||
| public ParquetDataManager(Configuration hadoopConf, String basePath) { | ||
| this.hadoopConf = hadoopConf; | ||
| this.basePath = basePath; | ||
| try { | ||
| URI uri = new Path(basePath).toUri(); | ||
| this.fileSystem = FileSystem.get(uri, hadoopConf); | ||
| } catch (IOException e) { | ||
| throw new ReadException("Unable to initialize file system for base path: " + basePath, e); | ||
| } | ||
| } | ||
|
|
||
| @Getter(value = AccessLevel.PRIVATE, lazy = true) | ||
| private final List<LocatedFileStatus> parquetFiles = loadParquetFiles(); | ||
|
|
||
| ParquetFileInfo getMostRecentParquetFile() { | ||
| LocatedFileStatus file = | ||
| getParquetFiles().stream() | ||
| .max(Comparator.comparing(LocatedFileStatus::getModificationTime)) | ||
| .orElseThrow(() -> new IllegalStateException("No files found")); | ||
| return new ParquetFileInfo(hadoopConf, file); | ||
| } | ||
|
|
||
| ParquetFileInfo getParquetDataFileAt(long targetTime) { | ||
| return getParquetFiles().stream() | ||
| .filter(file -> file.getModificationTime() > targetTime) | ||
| .min(Comparator.comparing(LocatedFileStatus::getModificationTime)) | ||
| .map(file -> new ParquetFileInfo(hadoopConf, file)) | ||
| .orElseThrow(() -> new IllegalStateException("No file found at or after " + targetTime)); | ||
| } | ||
|
|
||
| private List<LocatedFileStatus> loadParquetFiles() { | ||
| try { | ||
| RemoteIterator<LocatedFileStatus> iterator = fileSystem.listFiles(new Path(basePath), true); | ||
| return RemoteIterators.toList(iterator).stream() | ||
| .filter(file -> file.getPath().getName().endsWith("parquet")) | ||
| .collect(Collectors.toList()); | ||
| } catch (IOException e) { | ||
| throw new ReadException("Unable to read files from file system", e); | ||
| } | ||
| } | ||
|
|
||
| Stream<ParquetFileInfo> getCurrentFilesInfo() { | ||
| return getParquetFiles().stream() | ||
| .map(fileStatus -> new ParquetFileInfo(hadoopConf, fileStatus)); | ||
| } | ||
|
|
||
| List<ParquetFileInfo> getParquetFilesMetadataAfterTime(long syncTime) { | ||
| return getParquetFiles().stream() | ||
| .filter(file -> file.getModificationTime() > syncTime) | ||
| .map(file -> new ParquetFileInfo(hadoopConf, file)) | ||
| .collect(Collectors.toList()); | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,51 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.xtable.parquet; | ||
|
|
||
| import lombok.Getter; | ||
| import lombok.Value; | ||
| import lombok.extern.log4j.Log4j2; | ||
|
|
||
| import org.apache.hadoop.conf.Configuration; | ||
| import org.apache.hadoop.fs.FileStatus; | ||
| import org.apache.hadoop.fs.Path; | ||
| import org.apache.parquet.hadoop.metadata.ParquetMetadata; | ||
|
|
||
| @Log4j2 | ||
| @Value | ||
| class ParquetFileInfo { | ||
| @Getter(lazy = true) | ||
| ParquetMetadata metadata = getMetadataInternal(); | ||
|
|
||
| long modificationTime; | ||
| long size; | ||
| Path path; | ||
| Configuration conf; | ||
|
|
||
| public ParquetFileInfo(Configuration conf, FileStatus file) { | ||
| this.modificationTime = file.getModificationTime(); | ||
| this.path = file.getPath(); | ||
| this.size = file.getLen(); | ||
| this.conf = conf; | ||
| } | ||
|
|
||
| private ParquetMetadata getMetadataInternal() { | ||
| return ParquetMetadataExtractor.getInstance().readParquetMetadata(conf, path); | ||
| } | ||
| } |
Uh oh!
There was an error while loading. Please reload this page.