3030import java .util .concurrent .TimeUnit ;
3131import org .apache .iceberg .BaseTable ;
3232import org .apache .iceberg .HasTableOperations ;
33+ import org .apache .iceberg .PartitionStatisticsFile ;
3334import org .apache .iceberg .RewriteTablePathUtil ;
3435import org .apache .iceberg .RewriteTablePathUtil .RewriteResult ;
3536import org .apache .iceberg .Snapshot ;
@@ -144,8 +145,9 @@ private void validateInputs() {
144145 "Source prefix cannot be the same as target prefix (%s)" , sourcePrefix ));
145146 }
146147
147- validateAndSetEndVersion ();
148- validateAndSetStartVersion ();
148+ TableMetadata tableMetadata = ((HasTableOperations ) table ).operations ().current ();
149+ validateAndSetEndVersion (tableMetadata );
150+ validateAndSetStartVersion (tableMetadata );
149151
150152 if (stagingDir == null ) {
151153 stagingDir =
@@ -158,9 +160,7 @@ private void validateInputs() {
158160 }
159161 }
160162
161- private void validateAndSetEndVersion () {
162- TableMetadata tableMetadata = ((HasTableOperations ) table ).operations ().current ();
163-
163+ private void validateAndSetEndVersion (TableMetadata tableMetadata ) {
164164 if (endVersionName == null ) {
165165 Objects .requireNonNull (
166166 tableMetadata .metadataFileLocation (), "Metadata file location should not be null" );
@@ -170,9 +170,7 @@ private void validateAndSetEndVersion() {
170170 }
171171 }
172172
173- private void validateAndSetStartVersion () {
174- TableMetadata tableMetadata = ((HasTableOperations ) table ).operations ().current ();
175-
173+ private void validateAndSetStartVersion (TableMetadata tableMetadata ) {
176174 if (startVersionName != null ) {
177175 this .startVersionName = validateVersion (tableMetadata , startVersionName );
178176 }
@@ -182,11 +180,12 @@ private String validateVersion(TableMetadata tableMetadata, String versionFileNa
182180 String versionFile = null ;
183181 if (versionInFilePath (tableMetadata .metadataFileLocation (), versionFileName )) {
184182 versionFile = tableMetadata .metadataFileLocation ();
185- }
186-
187- for (MetadataLogEntry log : tableMetadata .previousFiles ()) {
188- if (versionInFilePath (log .file (), versionFileName )) {
189- versionFile = log .file ();
183+ } else {
184+ for (MetadataLogEntry log : tableMetadata .previousFiles ()) {
185+ if (versionInFilePath (log .file (), versionFileName )) {
186+ versionFile = log .file ();
187+ break ;
188+ }
190189 }
191190 }
192191
@@ -209,15 +208,15 @@ private boolean versionInFilePath(String path, String version) {
209208 private String rebuildMetadata () {
210209 //TODO need to implement rewrite of manifest list , manifest files and position delete files.
211210 TableMetadata startMetadata = startVersionName != null
212- ? ((HasTableOperations ) newStaticTable (startVersionName , table .io ()))
213- .operations ()
214- .current ()
215- : null ;
211+ ? ((HasTableOperations ) newStaticTable (startVersionName , table .io ()))
212+ .operations ()
213+ .current ()
214+ : null ;
216215 TableMetadata endMetadata =
217- ((HasTableOperations ) newStaticTable (endVersionName , table .io ())).operations ().current ();
216+ ((HasTableOperations ) newStaticTable (endVersionName , table .io ())).operations ().current ();
218217
219- if ( endMetadata .partitionStatisticsFiles () != null
220- && !endMetadata . partitionStatisticsFiles () .isEmpty ()) {
218+ List < PartitionStatisticsFile > partitionStats = endMetadata .partitionStatisticsFiles ();
219+ if ( partitionStats != null && !partitionStats .isEmpty ()) {
221220 throw new IllegalArgumentException ("Partition statistics files are not supported yet." );
222221 }
223222
@@ -238,7 +237,7 @@ private String saveFileList(Set<Pair<String, String>> filesToMove) {
238237
239238 private void writeAsCsv (Set <Pair <String , String >> rows , OutputFile outputFile ) {
240239 try (BufferedWriter writer = new BufferedWriter (
241- new OutputStreamWriter (outputFile .createOrOverwrite (), StandardCharsets .UTF_8 ))) {
240+ new OutputStreamWriter (outputFile .createOrOverwrite (), StandardCharsets .UTF_8 ))) {
242241 for (Pair <String , String > pair : rows ) {
243242 writer .write (String .join ("," , pair .first (), pair .second ()));
244243 writer .newLine ();
@@ -275,11 +274,12 @@ private RewriteResult<Snapshot> rewriteVersionFiles(TableMetadata endMetadata) {
275274
276275 private Set <Pair <String , String >> rewriteVersionFile (TableMetadata metadata , String versionFilePath ) {
277276 Set <Pair <String , String >> result = new HashSet <>();
278- String stagingPath =
279- RewriteTablePathUtil . stagingPath ( versionFilePath , sourcePrefix , stagingDir );
277+ String stagingPath = RewriteTablePathUtil . stagingPath ( versionFilePath , sourcePrefix , stagingDir );
278+
280279 System .out .println ("Processing version file " + versionFilePath );
281280 TableMetadata newTableMetadata = RewriteTablePathUtil .replacePaths (metadata , sourcePrefix , targetPrefix );
282281 TableMetadataParser .overwrite (newTableMetadata , table .io ().newOutputFile (stagingPath ));
282+
283283 result .add (Pair .of (stagingPath , RewriteTablePathUtil .newPath (versionFilePath , sourcePrefix , targetPrefix )));
284284 result .addAll (statsFileCopyPlan (metadata .statisticsFiles (), newTableMetadata .statisticsFiles ()));
285285
@@ -301,7 +301,7 @@ private Set<Pair<String, String>> statsFileCopyPlan(List<StatisticsFile> beforeS
301301 StatisticsFile before = beforeStats .get (i );
302302 StatisticsFile after = afterStats .get (i );
303303 if (before .fileSizeInBytes () != after .fileSizeInBytes ()) {
304- throw new IllegalArgumentException ("Before and after path rewrite, statistic files count should be same" );
304+ throw new IllegalArgumentException ("Before and after path rewrite, statistic files size should be same" );
305305 }
306306 result .add (Pair .of (before .path (), after .path ()));
307307 }
0 commit comments