|
40 | 40 | import org.apache.iceberg.types.Type; |
41 | 41 | import org.apache.iceberg.types.Types; |
42 | 42 | import org.apache.iceberg.util.UUIDUtil; |
| 43 | +import org.apache.orc.ColumnStatistics; |
| 44 | +import org.apache.orc.OrcFile; |
| 45 | +import org.apache.orc.Reader; |
| 46 | +import org.apache.orc.TypeDescription; |
43 | 47 | import org.apache.parquet.column.Encoding; |
44 | 48 | import org.apache.parquet.column.statistics.Statistics; |
45 | 49 | import org.apache.parquet.hadoop.ParquetFileReader; |
@@ -208,7 +212,7 @@ public void testExecuteWithSourceTable(@Mocked Table table, @Mocked IcebergHiveC |
208 | 212 | () -> procedure.execute(context, args)); |
209 | 213 |
|
210 | 214 | // The exception should be about Hive table access, not "not implemented" |
211 | | - assertTrue(exception.getMessage().contains("Failed to access source table") || |
| 215 | + assertTrue(exception.getMessage().contains("Failed to access source table") || |
212 | 216 | exception.getMessage().contains("not found") || |
213 | 217 | exception.getMessage().contains("not a Hive table")); |
214 | 218 | } |
@@ -598,6 +602,95 @@ public ParquetFileReader open(HadoopInputFile ignored) { |
598 | 602 | assertTrue(metrics.upperBounds().isEmpty()); |
599 | 603 | } |
600 | 604 |
|
| 605 | + |
| 606 | + @Test |
| 607 | + public void testExtractOrcMetrics(@Mocked Table table, |
| 608 | + @Mocked IcebergHiveCatalog catalog, |
| 609 | + @Mocked Reader orcReader) throws Exception { |
| 610 | + AddFilesProcedure procedure = AddFilesProcedure.getInstance(); |
| 611 | + IcebergTableProcedureContext context = |
| 612 | + new IcebergTableProcedureContext(catalog, table, null, null, HDFS_ENVIRONMENT, null, null); |
| 613 | + |
| 614 | + Schema schema = new Schema( |
| 615 | + Types.NestedField.optional(1, "id", Types.IntegerType.get()), |
| 616 | + Types.NestedField.optional(2, "name", Types.StringType.get()) |
| 617 | + ); |
| 618 | + |
| 619 | + FileStatus fileStatus = new FileStatus(2048L, false, 1, 0L, 0L, new Path("/tmp/data.orc")); |
| 620 | + |
| 621 | + TypeDescription orcSchema = TypeDescription.createStruct() |
| 622 | + .addField("id", TypeDescription.createInt()) |
| 623 | + .addField("name", TypeDescription.createString()); |
| 624 | + |
| 625 | + |
| 626 | + long totalRows = 100L; |
| 627 | + |
| 628 | + // index 0: root struct statistics (should not be mapped to any column) |
| 629 | + ColumnStatistics rootStats = Mockito.mock(ColumnStatistics.class); |
| 630 | + Mockito.when(rootStats.getNumberOfValues()).thenReturn(totalRows); |
| 631 | + Mockito.when(rootStats.hasNull()).thenReturn(false); |
| 632 | + |
| 633 | + // index 1: "id" column statistics, 90 non-null values, has nulls |
| 634 | + ColumnStatistics idStats = Mockito.mock(ColumnStatistics.class); |
| 635 | + Mockito.when(idStats.getNumberOfValues()).thenReturn(90L); |
| 636 | + Mockito.when(idStats.hasNull()).thenReturn(true); |
| 637 | + |
| 638 | + // index 2: "name" column statistics, 80 non-null values, has nulls |
| 639 | + ColumnStatistics nameStats = Mockito.mock(ColumnStatistics.class); |
| 640 | + Mockito.when(nameStats.getNumberOfValues()).thenReturn(80L); |
| 641 | + Mockito.when(nameStats.hasNull()).thenReturn(true); |
| 642 | + |
| 643 | + ColumnStatistics[] allStats = new ColumnStatistics[] {rootStats, idStats, nameStats}; |
| 644 | + |
| 645 | + new Expectations() { |
| 646 | + { |
| 647 | + table.schema(); |
| 648 | + result = schema; |
| 649 | + |
| 650 | + orcReader.getNumberOfRows(); |
| 651 | + result = totalRows; |
| 652 | + |
| 653 | + orcReader.getSchema(); |
| 654 | + result = orcSchema; |
| 655 | + |
| 656 | + orcReader.getStatistics(); |
| 657 | + result = allStats; |
| 658 | + } |
| 659 | + }; |
| 660 | + |
| 661 | + new MockUp<OrcFile>() { |
| 662 | + @Mock |
| 663 | + public Reader createReader(Path path, OrcFile.ReaderOptions opts) { |
| 664 | + return orcReader; |
| 665 | + } |
| 666 | + }; |
| 667 | + |
| 668 | + java.lang.reflect.Method method = AddFilesProcedure.class.getDeclaredMethod("extractOrcMetrics", |
| 669 | + IcebergTableProcedureContext.class, Table.class, FileStatus.class); |
| 670 | + method.setAccessible(true); |
| 671 | + Metrics metrics = (Metrics) method.invoke(procedure, context, table, fileStatus); |
| 672 | + |
| 673 | + assertEquals(totalRows, metrics.recordCount()); |
| 674 | + |
| 675 | + |
| 676 | + assertNotNull(metrics.valueCounts()); |
| 677 | + assertTrue(metrics.valueCounts().containsKey(1)); |
| 678 | + assertEquals(90L, metrics.valueCounts().get(1)); |
| 679 | + |
| 680 | + assertTrue(metrics.valueCounts().containsKey(2)); |
| 681 | + assertEquals(80L, metrics.valueCounts().get(2)); |
| 682 | + |
| 683 | + assertNotNull(metrics.nullValueCounts()); |
| 684 | + assertTrue(metrics.nullValueCounts().containsKey(1)); |
| 685 | + assertEquals(10L, metrics.nullValueCounts().get(1)); |
| 686 | + |
| 687 | + assertTrue(metrics.nullValueCounts().containsKey(2)); |
| 688 | + assertEquals(20L, metrics.nullValueCounts().get(2)); |
| 689 | + |
| 690 | + // Only 2 columns should have valueCounts, rootStats should not be mapped to any column |
| 691 | + assertEquals(2, metrics.valueCounts().size()); |
| 692 | + } |
| 693 | + |
601 | 694 | private IcebergTableProcedureContext createMockContext(@Mocked Table table, @Mocked IcebergHiveCatalog catalog) { |
602 | 695 | ConnectContext ctx = Mockito.mock(ConnectContext.class); |
603 | 696 | AlterTableStmt stmt = Mockito.mock(AlterTableStmt.class); |
|
0 commit comments