|
1 | 1 | /* |
2 | | - * Licensed to the Apache Software Foundation (ASF) under one |
3 | | - * or more contributor license agreements. See the NOTICE file |
4 | | - * distributed with this work for additional information |
5 | | - * regarding copyright ownership. The ASF licenses this file |
6 | | - * to you under the Apache License, Version 2.0 (the |
7 | | - * "License"); you may not use this file except in compliance |
8 | | - * with the License. You may obtain a copy of the License at |
| 2 | + * Licensed to the Apache Software Foundation (ASF) under one or more |
| 3 | + * contributor license agreements. See the NOTICE file distributed with |
| 4 | + * this work for additional information regarding copyright ownership. |
| 5 | + * The ASF licenses this file to You under the Apache License, Version 2.0 |
| 6 | + * (the "License"); you may not use this file except in compliance with |
| 7 | + * the License. You may obtain a copy of the License at |
9 | 8 | * |
10 | | - * http://www.apache.org/licenses/LICENSE-2.0 |
| 9 | + * http://www.apache.org/licenses/LICENSE-2.0 |
11 | 10 | * |
12 | | - * Unless required by applicable law or agreed to in writing, |
13 | | - * software distributed under the License is distributed on an |
14 | | - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
15 | | - * KIND, either express or implied. See the License for the |
16 | | - * specific language governing permissions and limitations |
17 | | - * under the License. |
| 11 | + * Unless required by applicable law or agreed to in writing, software |
| 12 | + * distributed under the License is distributed on an "AS IS" BASIS, |
| 13 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 14 | + * See the License for the specific language governing permissions and |
| 15 | + * limitations under the License. |
18 | 16 | */ |
19 | 17 | package org.apache.hadoop.ozone.iceberg; |
20 | 18 |
|
| 19 | +import java.io.BufferedWriter; |
| 20 | +import java.io.IOException; |
| 21 | +import java.io.OutputStreamWriter; |
| 22 | +import java.nio.charset.StandardCharsets; |
| 23 | +import java.util.HashSet; |
| 24 | +import java.util.List; |
21 | 25 | import java.util.Objects; |
| 26 | +import java.util.Set; |
22 | 27 | import java.util.UUID; |
23 | 28 | import java.util.concurrent.ExecutorService; |
24 | 29 | import java.util.concurrent.Executors; |
25 | 30 | import java.util.concurrent.TimeUnit; |
26 | | - |
| 31 | +import org.apache.iceberg.BaseTable; |
27 | 32 | import org.apache.iceberg.HasTableOperations; |
28 | 33 | import org.apache.iceberg.RewriteTablePathUtil; |
| 34 | +import org.apache.iceberg.RewriteTablePathUtil.RewriteResult; |
| 35 | +import org.apache.iceberg.Snapshot; |
| 36 | +import org.apache.iceberg.StaticTableOperations; |
| 37 | +import org.apache.iceberg.StatisticsFile; |
29 | 38 | import org.apache.iceberg.Table; |
30 | 39 | import org.apache.iceberg.TableMetadata; |
31 | 40 | import org.apache.iceberg.TableMetadata.MetadataLogEntry; |
| 41 | +import org.apache.iceberg.TableMetadataParser; |
32 | 42 | import org.apache.iceberg.actions.ImmutableRewriteTablePath; |
33 | 43 | import org.apache.iceberg.actions.RewriteTablePath; |
| 44 | +import org.apache.iceberg.exceptions.RuntimeIOException; |
| 45 | +import org.apache.iceberg.io.FileIO; |
| 46 | +import org.apache.iceberg.io.OutputFile; |
| 47 | +import org.apache.iceberg.util.Pair; |
34 | 48 |
|
| 49 | +/** |
| 50 | + * An implementation of {@link RewriteTablePath} for Apache Ozone backed Iceberg tables. |
| 51 | + * |
| 52 | + * <p>This action rewrites table's metadata and position delete file paths by replacing a source |
| 53 | + * prefix with a target prefix. It processes table versions, snapshots, manifests and position delete files.</p> |
| 54 | + * |
| 55 | + * <p>The rewrite can be scoped between optional start and end metadata versions, |
| 56 | + * and all rewritten files are staged in a temporary directory.</p> |
| 57 | + */ |
35 | 58 | public class RewriteTablePathOzoneAction implements RewriteTablePath { |
36 | 59 |
|
37 | 60 | private static final String RESULT_LOCATION = "file-list"; |
@@ -184,8 +207,105 @@ private boolean versionInFilePath(String path, String version) { |
184 | 207 | } |
185 | 208 |
|
186 | 209 | private String rebuildMetadata() { |
187 | | - //TODO need to implement rewrite of metadata files , manifest list , manifest files and position delete files. |
188 | | - return null; |
| 210 | + //TODO need to implement rewrite of manifest list , manifest files and position delete files. |
| 211 | + TableMetadata startMetadata = startVersionName != null |
| 212 | + ? ((HasTableOperations) newStaticTable(startVersionName, table.io())) |
| 213 | + .operations() |
| 214 | + .current() |
| 215 | + : null; |
| 216 | + TableMetadata endMetadata = |
| 217 | + ((HasTableOperations) newStaticTable(endVersionName, table.io())).operations().current(); |
| 218 | + |
| 219 | + if (endMetadata.partitionStatisticsFiles() != null |
| 220 | + && !endMetadata.partitionStatisticsFiles().isEmpty()) { |
| 221 | + throw new IllegalArgumentException("Partition statistics files are not supported yet."); |
| 222 | + } |
| 223 | + |
| 224 | + RewriteResult<Snapshot> rewriteVersionResult = rewriteVersionFiles(endMetadata); |
| 225 | + |
| 226 | + Set<Pair<String, String>> copyPlan = new HashSet<>(); |
| 227 | + copyPlan.addAll(rewriteVersionResult.copyPlan()); |
| 228 | + |
| 229 | + return saveFileList(copyPlan); |
| 230 | + } |
| 231 | + |
| 232 | + private String saveFileList(Set<Pair<String, String>> filesToMove) { |
| 233 | + String fileListPath = stagingDir + RESULT_LOCATION; |
| 234 | + OutputFile fileList = table.io().newOutputFile(fileListPath); |
| 235 | + writeAsCsv(filesToMove, fileList); |
| 236 | + return fileListPath; |
| 237 | + } |
| 238 | + |
| 239 | + private void writeAsCsv(Set<Pair<String, String>> rows, OutputFile outputFile) { |
| 240 | + try (BufferedWriter writer = new BufferedWriter( |
| 241 | + new OutputStreamWriter(outputFile.createOrOverwrite(), StandardCharsets.UTF_8))) { |
| 242 | + for (Pair<String, String> pair : rows) { |
| 243 | + writer.write(String.join(",", pair.first(), pair.second())); |
| 244 | + writer.newLine(); |
| 245 | + } |
| 246 | + } catch (IOException e) { |
| 247 | + throw new RuntimeIOException(e); |
| 248 | + } |
| 249 | + } |
| 250 | + |
| 251 | + private RewriteResult<Snapshot> rewriteVersionFiles(TableMetadata endMetadata) { |
| 252 | + RewriteResult<Snapshot> result = new RewriteResult<>(); |
| 253 | + result.toRewrite().addAll(endMetadata.snapshots()); |
| 254 | + result.copyPlan().addAll(rewriteVersionFile(endMetadata, endVersionName)); |
| 255 | + |
| 256 | + List<MetadataLogEntry> versions = endMetadata.previousFiles(); |
| 257 | + for (int i = versions.size() - 1; i >= 0; i--) { |
| 258 | + String versionFilePath = versions.get(i).file(); |
| 259 | + if (versionFilePath.equals(startVersionName)) { |
| 260 | + break; |
| 261 | + } |
| 262 | + |
| 263 | + if (!fileExist(versionFilePath)) { |
| 264 | + throw new IllegalArgumentException(String.format("Version file %s doesn't exist", versionFilePath)); |
| 265 | + } |
| 266 | + |
| 267 | + TableMetadata tableMetadata = new StaticTableOperations(versionFilePath, table.io()).current(); |
| 268 | + |
| 269 | + result.toRewrite().addAll(tableMetadata.snapshots()); |
| 270 | + result.copyPlan().addAll(rewriteVersionFile(tableMetadata, versionFilePath)); |
| 271 | + } |
| 272 | + |
| 273 | + return result; |
| 274 | + } |
| 275 | + |
| 276 | + private Set<Pair<String, String>> rewriteVersionFile(TableMetadata metadata, String versionFilePath) { |
| 277 | + Set<Pair<String, String>> result = new HashSet<>(); |
| 278 | + String stagingPath = |
| 279 | + RewriteTablePathUtil.stagingPath(versionFilePath, sourcePrefix, stagingDir); |
| 280 | + System.out.println("Processing version file " + versionFilePath); |
| 281 | + TableMetadata newTableMetadata = RewriteTablePathUtil.replacePaths(metadata, sourcePrefix, targetPrefix); |
| 282 | + TableMetadataParser.overwrite(newTableMetadata, table.io().newOutputFile(stagingPath)); |
| 283 | + result.add(Pair.of(stagingPath, RewriteTablePathUtil.newPath(versionFilePath, sourcePrefix, targetPrefix))); |
| 284 | + result.addAll(statsFileCopyPlan(metadata.statisticsFiles(), newTableMetadata.statisticsFiles())); |
| 285 | + |
| 286 | + return result; |
| 287 | + } |
| 288 | + |
| 289 | + private Set<Pair<String, String>> statsFileCopyPlan(List<StatisticsFile> beforeStats, |
| 290 | + List<StatisticsFile> afterStats) { |
| 291 | + Set<Pair<String, String>> result = new HashSet<>(); |
| 292 | + if (beforeStats.isEmpty()) { |
| 293 | + return result; |
| 294 | + } |
| 295 | + |
| 296 | + if (beforeStats.size() != afterStats.size()) { |
| 297 | + throw new IllegalArgumentException("Before and after path rewrite, statistic files count should be same"); |
| 298 | + } |
| 299 | + |
| 300 | + for (int i = 0; i < beforeStats.size(); i++) { |
| 301 | + StatisticsFile before = beforeStats.get(i); |
| 302 | + StatisticsFile after = afterStats.get(i); |
| 303 | + if (before.fileSizeInBytes() != after.fileSizeInBytes()) { |
| 304 | + throw new IllegalArgumentException("Before and after path rewrite, statistic files count should be same"); |
| 305 | + } |
| 306 | + result.add(Pair.of(before.path(), after.path())); |
| 307 | + } |
| 308 | + return result; |
189 | 309 | } |
190 | 310 |
|
191 | 311 | private boolean fileExist(String path) { |
@@ -216,4 +336,9 @@ private static void checkNonNullNonEmpty(String value, String name) { |
216 | 336 | throw new IllegalArgumentException(name + " is empty"); |
217 | 337 | } |
218 | 338 | } |
| 339 | + |
| 340 | + private Table newStaticTable(String metadataFileLocation, FileIO io) { |
| 341 | + StaticTableOperations ops = new StaticTableOperations(metadataFileLocation, io); |
| 342 | + return new BaseTable(ops, metadataFileLocation); |
| 343 | + } |
219 | 344 | } |
0 commit comments