Skip to content

Commit b9ca107

Browse files
committed
address review comments
1 parent 30f99f5 commit b9ca107

2 files changed

Lines changed: 31 additions & 43 deletions

File tree

iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/IcebergQueryCompactor.java

Lines changed: 25 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -70,17 +70,10 @@ public boolean run(CompactorContext context) throws IOException, HiveException,
7070

7171
HiveConf conf = new HiveConf(context.getConf());
7272
CompactionInfo ci = context.getCompactionInfo();
73-
org.apache.hadoop.hive.ql.metadata.Table hiveTable =
74-
new org.apache.hadoop.hive.ql.metadata.Table(context.getTable());
75-
boolean rowLineageEnabled = RowLineageUtils.supportsRowLineage(hiveTable);
76-
String compactionQuery = buildCompactionQuery(context, compactTableName, conf, rowLineageEnabled);
7773

78-
SessionState sessionState = setupQueryCompactionSession(conf, ci, tblProperties);
74+
String compactionQuery = buildCompactionQuery(context, compactTableName, conf);
7975

80-
if (rowLineageEnabled) {
81-
RowLineageUtils.enableRowLineage(sessionState);
82-
LOG.debug("Row lineage flag set for compaction of table {}", compactTableName);
83-
}
76+
SessionState sessionState = setupQueryCompactionSession(conf, ci, tblProperties);
8477

8578
String compactionTarget = "table " + HiveUtils.unparseIdentifier(compactTableName) +
8679
(ci.partName != null ? ", partition " + HiveUtils.unparseIdentifier(ci.partName) : "");
@@ -98,22 +91,31 @@ public boolean run(CompactorContext context) throws IOException, HiveException,
9891
}
9992
}
10093

101-
private String buildCompactionQuery(CompactorContext context, String compactTableName, HiveConf conf,
102-
boolean rowLineageEnabled)
94+
private String buildCompactionQuery(CompactorContext context, String compactTableName, HiveConf conf)
10395
throws HiveException {
10496
CompactionInfo ci = context.getCompactionInfo();
105-
String rowLineageColumns = RowLineageUtils.getRowLineageSelectColumns(rowLineageEnabled);
10697
org.apache.hadoop.hive.ql.metadata.Table table = Hive.get(conf).getTable(context.getTable().getDbName(),
10798
context.getTable().getTableName());
10899
Table icebergTable = IcebergTableUtil.getTable(conf, table.getTTable());
109100
String orderBy = ci.orderByClause == null ? "" : ci.orderByClause;
110101
String fileSizePredicate = buildMinorFileSizePredicate(ci, compactTableName, conf, table);
111102

103+
String columnsList = "*";
104+
if (RowLineageUtils.supportsRowLineage(table)) {
105+
RowLineageUtils.enableRowLineage(conf);
106+
LOG.debug("Row lineage flag set for compaction of table {}", compactTableName);
107+
if (ci.isMajorCompaction() && ci.partName == null) {
108+
columnsList = buildSelectColumnList(icebergTable, conf) + RowLineageUtils.getRowLineageColumnsForCompaction();
109+
} else {
110+
columnsList = columnsList + RowLineageUtils.getRowLineageColumnsForCompaction();
111+
}
112+
}
113+
112114
String compactionQuery = (ci.partName == null) ?
113115
buildFullTableCompactionQuery(compactTableName, conf, icebergTable,
114-
rowLineageColumns, fileSizePredicate, orderBy) :
116+
columnsList, fileSizePredicate, orderBy) :
115117
buildPartitionCompactionQuery(ci, compactTableName, conf, icebergTable,
116-
rowLineageColumns, fileSizePredicate, orderBy);
118+
columnsList, fileSizePredicate, orderBy);
117119

118120
LOG.info("Compaction query: {}", compactionQuery);
119121
return compactionQuery;
@@ -139,15 +141,14 @@ private String buildFullTableCompactionQuery(
139141
String compactTableName,
140142
HiveConf conf,
141143
Table icebergTable,
142-
String rowLineageColumns,
144+
String columnsList,
143145
String fileSizePredicate,
144146
String orderBy) throws HiveException {
145-
String selectColumns = buildSelectColumnList(icebergTable, conf);
146147

147148
if (!icebergTable.spec().isPartitioned()) {
148149
HiveConf.setVar(conf, ConfVars.REWRITE_POLICY, RewritePolicy.FULL_TABLE.name());
149-
return String.format("insert overwrite table %1$s select %2$s%3$s from %1$s %4$s %5$s",
150-
compactTableName, selectColumns, rowLineageColumns,
150+
return String.format("insert overwrite table %1$s select %2$s from %1$s %3$s %4$s",
151+
compactTableName, columnsList,
151152
fileSizePredicate == null ? "" : "where " + fileSizePredicate, orderBy);
152153
}
153154

@@ -156,9 +157,9 @@ private String buildFullTableCompactionQuery(
156157
HiveConf.setVar(conf, ConfVars.REWRITE_POLICY, RewritePolicy.PARTITION.name());
157158
// A single filter on a virtual column causes errors during compilation,
158159
// added another filter on file_path as a workaround.
159-
return String.format("insert overwrite table %1$s select %2$s%3$s from %1$s " +
160-
"where %4$s != %5$d and %6$s is not null %7$s %8$s",
161-
compactTableName, selectColumns, rowLineageColumns,
160+
return String.format("insert overwrite table %1$s select %2$s from %1$s " +
161+
"where %3$s != %4$d and %5$s is not null %6$s %7$s",
162+
compactTableName, columnsList,
162163
VirtualColumn.PARTITION_SPEC_ID.getName(), icebergTable.spec().specId(),
163164
VirtualColumn.FILE_PATH.getName(), fileSizePredicate == null ? "" : "and " + fileSizePredicate, orderBy);
164165
}
@@ -173,7 +174,7 @@ private String buildPartitionCompactionQuery(
173174
String compactTableName,
174175
HiveConf conf,
175176
Table icebergTable,
176-
String rowLineageColumns,
177+
String columnsList,
177178
String fileSizePredicate,
178179
String orderBy) throws HiveException {
179180
HiveConf.setBoolVar(conf, ConfVars.HIVE_CONVERT_JOIN, false);
@@ -190,9 +191,9 @@ private String buildPartitionCompactionQuery(
190191
throw new HiveException(e);
191192
}
192193

193-
return String.format("INSERT OVERWRITE TABLE %1$s SELECT *%2$s FROM %1$s WHERE %3$s IN " +
194+
return String.format("INSERT OVERWRITE TABLE %1$s SELECT %2$s FROM %1$s WHERE %3$s IN " +
194195
"(SELECT FILE_PATH FROM %1$s.FILES WHERE %4$s AND SPEC_ID = %5$d) %6$s %7$s",
195-
compactTableName, rowLineageColumns, VirtualColumn.FILE_PATH.getName(), partitionPredicate, spec.specId(),
196+
compactTableName, columnsList, VirtualColumn.FILE_PATH.getName(), partitionPredicate, spec.specId(),
196197
fileSizePredicate == null ? "" : "AND " + fileSizePredicate, orderBy);
197198
}
198199

ql/src/java/org/apache/hadoop/hive/ql/metadata/RowLineageUtils.java

Lines changed: 6 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.util.Map;
2525

2626
import org.apache.hadoop.conf.Configuration;
27+
import org.apache.hadoop.hive.conf.HiveConf;
2728
import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
2829
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
2930
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
@@ -86,10 +87,8 @@ public static boolean supportsRowLineage(Table table) {
8687
* Returns the row lineage virtual columns with the leading comma for string concatenation.
8788
* Example: {@code ", ROW__LINEAGE__ID, LAST__UPDATED__SEQUENCE__NUMBER"}.
8889
*/
89-
public static String getRowLineageSelectColumns(boolean rowLineageEnabled) {
90-
return rowLineageEnabled
91-
? ", " + VirtualColumn.ROW_LINEAGE_ID.getName() + ", " + VirtualColumn.LAST_UPDATED_SEQUENCE_NUMBER.getName()
92-
: "";
90+
public static String getRowLineageColumnsForCompaction() {
91+
return ", " + VirtualColumn.ROW_LINEAGE_ID.getName() + ", " + VirtualColumn.LAST_UPDATED_SEQUENCE_NUMBER.getName();
9392
}
9493

9594
/**
@@ -99,24 +98,12 @@ public static void setRowLineage(Configuration conf, boolean enabled) {
9998
SessionStateUtil.addResource(conf, SessionStateUtil.ROW_LINEAGE, enabled);
10099
}
101100

102-
private static void setRowLineageConfFlag(Configuration conf, boolean enabled) {
103-
if (enabled) {
104-
conf.setBoolean(SessionStateUtil.ROW_LINEAGE, true);
105-
} else {
106-
conf.unset(SessionStateUtil.ROW_LINEAGE);
107-
}
108-
}
109-
110-
/**
111-
* Enable the row lineage session flag for the current statement execution.
112-
* Returns {@code true} if the flag was enabled
113-
*/
114-
public static void enableRowLineage(SessionState sessionState) {
115-
setRowLineageConfFlag(sessionState.getConf(), true);
101+
public static void enableRowLineage(Configuration conf) {
102+
conf.setBoolean(SessionStateUtil.ROW_LINEAGE, true);
116103
}
117104

118105
public static void disableRowLineage(SessionState sessionState) {
119-
setRowLineageConfFlag(sessionState.getConf(), false);
106+
sessionState.getConf().setBoolean(SessionStateUtil.ROW_LINEAGE, false);
120107
}
121108

122109
public static boolean shouldAddRowLineageColumnsForMerge(MergeStatement mergeStatement, Configuration conf) {

0 commit comments

Comments
 (0)