Skip to content

Commit 5e9a1ca

Browse files
authored
perf: reduce nativeIcebergScanMetadata serialization points (apache#3243)
1 parent 313a9d9 commit 5e9a1ca

2 files changed

Lines changed: 8 additions & 13 deletions

File tree

spark/src/main/scala/org/apache/spark/sql/comet/CometBatchScanExec.scala

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ import org.apache.comet.iceberg.CometIcebergNativeScanMetadata
4141
case class CometBatchScanExec(
4242
wrapped: BatchScanExec,
4343
runtimeFilters: Seq[Expression],
44-
nativeIcebergScanMetadata: Option[CometIcebergNativeScanMetadata] = None)
44+
@transient nativeIcebergScanMetadata: Option[CometIcebergNativeScanMetadata] = None)
4545
extends DataSourceV2ScanExecBase
4646
with CometPlan {
4747
def ordering: Option[Seq[SortOrder]] = wrapped.ordering
@@ -99,26 +99,23 @@ case class CometBatchScanExec(
9999
override def equals(other: Any): Boolean = other match {
100100
case other: CometBatchScanExec =>
101101
// `wrapped` in `this` and `other` could reference to the same `BatchScanExec` object,
102-
// check `runtimeFilters` and `nativeIcebergScanMetadata` equality too.
103-
this.wrappedScan == other.wrappedScan && this.runtimeFilters == other.runtimeFilters &&
104-
this.nativeIcebergScanMetadata == other.nativeIcebergScanMetadata
102+
// check `runtimeFilters` equality too.
103+
this.wrappedScan == other.wrappedScan && this.runtimeFilters == other.runtimeFilters
105104
case _ =>
106105
false
107106
}
108107

109108
override def hashCode(): Int = {
110-
Objects.hashCode(
111-
wrappedScan,
112-
runtimeFilters,
113-
Integer.valueOf(nativeIcebergScanMetadata.map(_.hashCode()).getOrElse(0)))
109+
Objects.hashCode(wrappedScan, runtimeFilters)
114110
}
115111

116112
override def doCanonicalize(): CometBatchScanExec = {
117113
this.copy(
118114
wrapped = wrappedScan.doCanonicalize(),
119115
runtimeFilters = QueryPlan.normalizePredicates(
120116
runtimeFilters.filterNot(_ == DynamicPruningExpression(Literal.TrueLiteral)),
121-
output))
117+
output),
118+
nativeIcebergScanMetadata = None)
122119
}
123120

124121
override def nodeName: String = {

spark/src/main/scala/org/apache/spark/sql/comet/CometIcebergNativeScanExec.scala

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -166,8 +166,7 @@ case class CometIcebergNativeScanExec(
166166
this.metadataLocation == other.metadataLocation &&
167167
this.output == other.output &&
168168
this.serializedPlanOpt == other.serializedPlanOpt &&
169-
this.numPartitions == other.numPartitions &&
170-
this.nativeIcebergScanMetadata == other.nativeIcebergScanMetadata
169+
this.numPartitions == other.numPartitions
171170
case _ =>
172171
false
173172
}
@@ -178,8 +177,7 @@ case class CometIcebergNativeScanExec(
178177
metadataLocation,
179178
output.asJava,
180179
serializedPlanOpt,
181-
numPartitions: java.lang.Integer,
182-
nativeIcebergScanMetadata)
180+
numPartitions: java.lang.Integer)
183181
}
184182

185183
object CometIcebergNativeScanExec {

0 commit comments

Comments
 (0)