diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/DeleteSummary.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/DeleteSummary.java index 76ece79b09a96..f24b3e998b2e9 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/DeleteSummary.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/DeleteSummary.java @@ -36,4 +36,17 @@ public interface DeleteSummary extends WriteSummary { * Returns the number of rows copied unmodified, or -1 if not found. */ long numCopiedRows(); + + /** + * Returns the time, in milliseconds, spent in the runtime group-filter subquery that identifies + * which target files contain rows matching the delete condition. Returns -1 when no such + * subquery was injected. + */ + long groupFilterTimeMs(); + + /** + * Returns the wall-clock time, in milliseconds, of the main write job that scans pruned + * target files, applies the delete, and writes output. + */ + long writeJobTimeMs(); } diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/MergeSummary.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/MergeSummary.java index e5ae57a767080..229a014d89fce 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/MergeSummary.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/MergeSummary.java @@ -69,4 +69,17 @@ public interface MergeSummary extends WriteSummary { * or -1 if not found. */ long numTargetRowsNotMatchedBySourceDeleted(); + + /** + * Returns the time, in milliseconds, spent in the runtime group-filter subquery that identifies + * which target files contain rows matching the merge condition. Returns -1 when no such + * subquery was injected. + */ + long groupFilterTimeMs(); + + /** + * Returns the wall-clock time, in milliseconds, of the main write job that scans pruned + * target files, applies the merge logic, and writes output. + */ + long writeJobTimeMs(); } diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/UpdateSummary.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/UpdateSummary.java index 99e9fcc1003ad..705d1969fab83 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/UpdateSummary.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/UpdateSummary.java @@ -36,4 +36,17 @@ public interface UpdateSummary extends WriteSummary { * Returns the number of rows copied unmodified, or -1 if not found. */ long numCopiedRows(); + + /** + * Returns the time, in milliseconds, spent in the runtime group-filter subquery that identifies + * which target files contain rows matching the update condition. Returns -1 when no such + * subquery was injected. + */ + long groupFilterTimeMs(); + + /** + * Returns the wall-clock time, in milliseconds, of the main write job that scans pruned + * target files, applies the updates, and writes output. + */ + long writeJobTimeMs(); } diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/WriteSummary.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/WriteSummary.java index a8ae462fd3c03..8d2527e80a52f 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/WriteSummary.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/WriteSummary.java @@ -26,4 +26,10 @@ */ @Evolving public interface WriteSummary { + + /** + * Returns the total time, in milliseconds, taken to execute the write operation up to (but not + * including) the connector commit. + */ + long executionTimeMs(); } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/write/DeleteSummaryImpl.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/write/DeleteSummaryImpl.scala index b96bf86a57681..4b07bd987c552 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/write/DeleteSummaryImpl.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/write/DeleteSummaryImpl.scala @@ -22,6 +22,9 @@ package org.apache.spark.sql.connector.write */ private[sql] case class DeleteSummaryImpl( numDeletedRows: Long, - numCopiedRows: Long) + numCopiedRows: Long, + executionTimeMs: Long, + groupFilterTimeMs: Long, + writeJobTimeMs: Long) extends DeleteSummary { } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/write/InsertSummaryImpl.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/write/InsertSummaryImpl.scala index 97c2e082c2573..2a3d079279717 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/write/InsertSummaryImpl.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/write/InsertSummaryImpl.scala @@ -20,5 +20,8 @@ package org.apache.spark.sql.connector.write /** * Implementation of [[InsertSummary]] that provides INSERT operation summary. */ -private[sql] case class InsertSummaryImpl(numInsertedRows: Long) extends InsertSummary { +private[sql] case class InsertSummaryImpl( + numInsertedRows: Long, + executionTimeMs: Long) + extends InsertSummary { } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/write/MergeSummaryImpl.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/write/MergeSummaryImpl.scala index 911749072c43c..a4ba31ef8b3f0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/write/MergeSummaryImpl.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/write/MergeSummaryImpl.scala @@ -28,6 +28,9 @@ private[sql] case class MergeSummaryImpl( numTargetRowsMatchedUpdated: Long, numTargetRowsMatchedDeleted: Long, numTargetRowsNotMatchedBySourceUpdated: Long, - numTargetRowsNotMatchedBySourceDeleted: Long) + numTargetRowsNotMatchedBySourceDeleted: Long, + executionTimeMs: Long, + groupFilterTimeMs: Long, + writeJobTimeMs: Long) extends MergeSummary { } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/write/UpdateSummaryImpl.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/write/UpdateSummaryImpl.scala index fc5cef30f000d..0fb8073daa136 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/write/UpdateSummaryImpl.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/write/UpdateSummaryImpl.scala @@ -22,6 +22,9 @@ package org.apache.spark.sql.connector.write */ private[sql] case class UpdateSummaryImpl( numUpdatedRows: Long, - numCopiedRows: Long) + numCopiedRows: Long, + executionTimeMs: Long, + groupFilterTimeMs: Long, + writeJobTimeMs: Long) extends UpdateSummary { } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala index 33709fbd5f5a7..73d50a7595b01 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.execution.datasources.v2 +import java.util.concurrent.TimeUnit + import scala.jdk.CollectionConverters._ import org.apache.spark.{SparkEnv, SparkException, TaskContext} @@ -36,7 +38,7 @@ import org.apache.spark.sql.connector.metric.CustomMetric import org.apache.spark.sql.connector.write.{BatchWrite, DataWriter, DataWriterFactory, DeleteSummaryImpl, DeltaWrite, DeltaWriter, InsertSummaryImpl, MergeSummaryImpl, PhysicalWriteInfoImpl, RowLevelOperation, RowLevelOperationTable, UpdateSummaryImpl, Write, WriterCommitMessage, WriteSummary} import org.apache.spark.sql.connector.write.RowLevelOperation.Command._ import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors} -import org.apache.spark.sql.execution.{QueryExecution, SparkPlan, SQLExecution, UnaryExecNode} +import org.apache.spark.sql.execution.{ExecSubqueryExpression, QueryExecution, SparkPlan, SQLExecution, UnaryExecNode} import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper import org.apache.spark.sql.execution.metric.{CustomMetrics, SQLLastAttemptMetric, SQLLastAttemptMetrics, SQLMetric, SQLMetrics} import org.apache.spark.sql.types.StructType @@ -296,7 +298,9 @@ case class AppendDataExec( copy(query = newChild) override protected def getWriteSummary(): Option[WriteSummary] = - Some(InsertSummaryImpl(numInsertedRows = numOutputRowsMetric.value)) + Some(InsertSummaryImpl( + numInsertedRows = numOutputRowsMetric.value, + executionTimeMs = executionTimeMsMetric.value)) } /** @@ -324,7 +328,10 @@ case class InsertOnlyMergeExec( numTargetRowsMatchedUpdated = 0L, numTargetRowsMatchedDeleted = 0L, numTargetRowsNotMatchedBySourceUpdated = 0L, - numTargetRowsNotMatchedBySourceDeleted = 0L)) + numTargetRowsNotMatchedBySourceDeleted = 0L, + executionTimeMs = executionTimeMsMetric.value, + groupFilterTimeMs = groupFilterTimeMs, + writeJobTimeMs = writeJobTimeMsMetric.value)) } /** @@ -558,20 +565,29 @@ trait RowLevelWriteExec extends V2ExistingTableWriteExec { getMetricValue(metrics, "numTargetRowsMatchedUpdated"), getMetricValue(metrics, "numTargetRowsMatchedDeleted"), getMetricValue(metrics, "numTargetRowsNotMatchedBySourceUpdated"), - getMetricValue(metrics, "numTargetRowsNotMatchedBySourceDeleted")) + getMetricValue(metrics, "numTargetRowsNotMatchedBySourceDeleted"), + executionTimeMsMetric.value, + groupFilterTimeMs, + writeJobTimeMsMetric.value) } } protected def getUpdateSummary(): Option[UpdateSummaryImpl] = { Some(UpdateSummaryImpl( getMetricValue(sparkMetrics, "numUpdatedRows"), - getMetricValue(sparkMetrics, "numCopiedRows"))) + getMetricValue(sparkMetrics, "numCopiedRows"), + executionTimeMsMetric.value, + groupFilterTimeMs, + writeJobTimeMsMetric.value)) } protected def getDeleteSummary(): Option[DeleteSummaryImpl] = { Some(DeleteSummaryImpl( getMetricValue(sparkMetrics, "numDeletedRows"), - getMetricValue(sparkMetrics, "numCopiedRows"))) + getMetricValue(sparkMetrics, "numCopiedRows"), + executionTimeMsMetric.value, + groupFilterTimeMs, + writeJobTimeMsMetric.value)) } } @@ -596,11 +612,24 @@ trait V2TableWriteExec protected lazy val numOutputRowsMetric: SQLMetric = SQLMetrics.createMetric(sparkContext, "number of output rows") + protected lazy val executionTimeMsMetric: SQLMetric = + SQLMetrics.createTimingMetric(sparkContext, "execution time") + protected lazy val groupFilterTimeMsMetric: SQLMetric = + SQLMetrics.createTimingMetric(sparkContext, "group filter time") + protected lazy val writeJobTimeMsMetric: SQLMetric = + SQLMetrics.createTimingMetric(sparkContext, "write job time") + + // Backing value for `groupFilterTimeMs` in the WriteSummary, which can be -1 unlike SQLMetric. + protected var groupFilterTimeMs: Long = -1L override protected def sparkMetrics: Map[String, SQLMetric] = Map( - "numOutputRows" -> numOutputRowsMetric) + "numOutputRows" -> numOutputRowsMetric, + "executionTimeMs" -> executionTimeMsMetric, + "groupFilterTimeMs" -> groupFilterTimeMsMetric, + "writeJobTimeMs" -> writeJobTimeMsMetric) protected def writeWithV2(batchWrite: BatchWrite): Seq[InternalRow] = { + val startNanos = System.nanoTime() val rdd: RDD[InternalRow] = { val tempRdd = query.execute() // SPARK-23271 If we are attempting to write a zero partition rdd, create a dummy single @@ -626,6 +655,7 @@ trait V2TableWriteExec val writeMetrics: Map[String, SQLMetric] = customMetrics try { + val rewriteStartNanos = System.nanoTime() sparkContext.runJob( rdd, (context: TaskContext, iter: Iterator[InternalRow]) => @@ -638,9 +668,21 @@ trait V2TableWriteExec batchWrite.onDataWriterCommit(commitMessage) } ) - + executionTimeMsMetric.set(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos)) + writeJobTimeMsMetric.set(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - rewriteStartNanos)) + groupFilterTimeMs = collectRowLevelGroupFilterTimeMs(query) + // SQLMetric.set ignores negative values, so the metric stays unset (and reads 0 in the UI) + // when no subquery was injected. + groupFilterTimeMsMetric.set(groupFilterTimeMs) val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY) - SQLMetrics.postDriverMetricUpdates(sparkContext, executionId, Seq(numOutputRowsMetric)) + SQLMetrics.postDriverMetricUpdates( + sparkContext, + executionId, + Seq( + numOutputRowsMetric, + executionTimeMsMetric, + groupFilterTimeMsMetric, + writeJobTimeMsMetric)) val writeSummary = getWriteSummary() logInfo(log"Data source write support ${MDC(LogKeys.BATCH_WRITE, batchWrite)} is committing.") @@ -671,6 +713,27 @@ trait V2TableWriteExec } protected def getWriteSummary(): Option[WriteSummary] = None + + /** + * Sums the `collectTime` of subqueries injected by `RowLevelOperationRuntimeGroupFiltering` + * to identify which target files contain rows matching the row-level operation condition. + * Returns -1 when no such subquery was injected. + */ + private def collectRowLevelGroupFilterTimeMs(plan: SparkPlan): Long = { + var found = false + var totalMs = 0L + foreach(plan) { + case b: BatchScanExec if b.table.isInstanceOf[RowLevelOperationTable] => + b.runtimeFilters.foreach(_.foreach { + case sub: ExecSubqueryExpression => + found = true + sub.plan.metrics.get("collectTime").foreach(m => totalMs += m.value) + case _ => + }) + case _ => + } + if (found) totalMs else -1L + } } trait WritingSparkTask[W <: DataWriter[InternalRow]] extends Logging with Serializable { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DeleteFromTableSuiteBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DeleteFromTableSuiteBase.scala index b894d5d75b3c8..d2c5ec8984cea 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DeleteFromTableSuiteBase.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DeleteFromTableSuiteBase.scala @@ -43,13 +43,25 @@ abstract class DeleteFromTableSuiteBase extends RowLevelOperationSuiteBase { protected def checkDeleteMetrics( numDeletedRows: Long, - numCopiedRows: Long): Unit = { + numCopiedRows: Long, + groupFilterTimeMissing: Boolean = false): Unit = { val summary = getDeleteSummary() assert(summary.numDeletedRows() === numDeletedRows, s"Expected numDeletedRows=$numDeletedRows, got ${summary.numDeletedRows()}") val expectedCopied = if (deltaDelete) 0L else numCopiedRows assert(summary.numCopiedRows() === expectedCopied, s"Expected numCopiedRows=$expectedCopied, got ${summary.numCopiedRows()}") + assert(summary.executionTimeMs() >= 0, + s"Expected executionTimeMs >= 0, got ${summary.executionTimeMs()}") + if (groupFilterTimeMissing) { + assert(summary.groupFilterTimeMs() === -1L, + s"Expected groupFilterTimeMs == -1, got ${summary.groupFilterTimeMs()}") + } else { + assert(summary.groupFilterTimeMs() >= 0, + s"Expected groupFilterTimeMs >= 0, got ${summary.groupFilterTimeMs()}") + } + assert(summary.writeJobTimeMs() >= 0, + s"Expected writeJobTimeMs >= 0, got ${summary.writeJobTimeMs()}") } test("delete from table containing added column with default value") { @@ -89,7 +101,7 @@ abstract class DeleteFromTableSuiteBase extends RowLevelOperationSuiteBase { Row(3, "software", "initial-text"), Row(4, "hr", "initial-text"), Row(6, "hr", "new-text"))) - checkDeleteMetrics(numDeletedRows = 2, numCopiedRows = 3) + checkDeleteMetrics(numDeletedRows = 2, numCopiedRows = 3, groupFilterTimeMissing = deltaDelete) } test("delete from table with table constraints") { @@ -115,13 +127,13 @@ abstract class DeleteFromTableSuiteBase extends RowLevelOperationSuiteBase { checkAnswer( sql(s"SELECT * FROM $tableNameAsString"), Seq(Row(2, 4, "eng"), Row(3, 6, "eng"))) - checkDeleteMetrics(numDeletedRows = 1, numCopiedRows = 0) + checkDeleteMetrics(numDeletedRows = 1, numCopiedRows = 0, groupFilterTimeMissing = deltaDelete) sql(s"DELETE FROM $tableNameAsString WHERE pk >=3") checkAnswer( sql(s"SELECT * FROM $tableNameAsString"), Seq(Row(2, 4, "eng"))) - checkDeleteMetrics(numDeletedRows = 1, numCopiedRows = 1) + checkDeleteMetrics(numDeletedRows = 1, numCopiedRows = 1, groupFilterTimeMissing = deltaDelete) } test("delete from table containing struct column with default value") { @@ -180,7 +192,7 @@ abstract class DeleteFromTableSuiteBase extends RowLevelOperationSuiteBase { checkAnswer(sql(s"SELECT * FROM $tableNameAsString"), Nil) - checkDeleteMetrics(numDeletedRows = 0, numCopiedRows = 0) + checkDeleteMetrics(numDeletedRows = 0, numCopiedRows = 0, groupFilterTimeMissing = deltaDelete) } test("delete with literal false condition") { @@ -196,7 +208,7 @@ abstract class DeleteFromTableSuiteBase extends RowLevelOperationSuiteBase { sql(s"SELECT * FROM $tableNameAsString"), Row(1, 1, "hr") :: Row(2, 2, "software") :: Row(3, 3, "hr") :: Nil) - checkDeleteMetrics(numDeletedRows = 0, numCopiedRows = 0) + checkDeleteMetrics(numDeletedRows = 0, numCopiedRows = 0, groupFilterTimeMissing = deltaDelete) } test("delete with literal true condition") { @@ -224,7 +236,7 @@ abstract class DeleteFromTableSuiteBase extends RowLevelOperationSuiteBase { sql(s"SELECT pk, dep FROM $tableNameAsString"), Row(1, "hr") :: Row(2, "software") :: Row(3, "hr") :: Nil) - checkDeleteMetrics(numDeletedRows = 0, numCopiedRows = 0) + checkDeleteMetrics(numDeletedRows = 0, numCopiedRows = 0, groupFilterTimeMissing = deltaDelete) } test("delete with NULL condition on non-null column") { @@ -240,7 +252,7 @@ abstract class DeleteFromTableSuiteBase extends RowLevelOperationSuiteBase { sql(s"SELECT * FROM $tableNameAsString"), Row(1, 1, "hr") :: Row(2, 2, "software") :: Row(3, 3, "hr") :: Nil) - checkDeleteMetrics(numDeletedRows = 0, numCopiedRows = 0) + checkDeleteMetrics(numDeletedRows = 0, numCopiedRows = 0, groupFilterTimeMissing = deltaDelete) } test("delete with basic filters") { @@ -256,7 +268,7 @@ abstract class DeleteFromTableSuiteBase extends RowLevelOperationSuiteBase { sql(s"SELECT * FROM $tableNameAsString"), Row(2, 2, "software") :: Row(3, 3, "hr") :: Nil) - checkDeleteMetrics(numDeletedRows = 1, numCopiedRows = 1) + checkDeleteMetrics(numDeletedRows = 1, numCopiedRows = 1, groupFilterTimeMissing = deltaDelete) } test("delete with aliases") { @@ -270,7 +282,7 @@ abstract class DeleteFromTableSuiteBase extends RowLevelOperationSuiteBase { checkAnswer(sql(s"SELECT * FROM $tableNameAsString"), Row(2, 2, "software") :: Nil) - checkDeleteMetrics(numDeletedRows = 2, numCopiedRows = 0) + checkDeleteMetrics(numDeletedRows = 2, numCopiedRows = 0, groupFilterTimeMissing = deltaDelete) } test("delete with IN predicates") { @@ -286,7 +298,7 @@ abstract class DeleteFromTableSuiteBase extends RowLevelOperationSuiteBase { sql(s"SELECT * FROM $tableNameAsString"), Row(2, 2, "software") :: Row(3, null, "hr") :: Nil) - checkDeleteMetrics(numDeletedRows = 1, numCopiedRows = 1) + checkDeleteMetrics(numDeletedRows = 1, numCopiedRows = 1, groupFilterTimeMissing = deltaDelete) } test("delete with NOT IN predicates") { @@ -301,14 +313,14 @@ abstract class DeleteFromTableSuiteBase extends RowLevelOperationSuiteBase { checkAnswer( sql(s"SELECT * FROM $tableNameAsString"), Row(1, 1, "hr") :: Row(2, 2, "software") :: Row(3, null, "hr") :: Nil) - checkDeleteMetrics(numDeletedRows = 0, numCopiedRows = 0) + checkDeleteMetrics(numDeletedRows = 0, numCopiedRows = 0, groupFilterTimeMissing = deltaDelete) sql(s"DELETE FROM $tableNameAsString WHERE id NOT IN (1, 10)") checkAnswer( sql(s"SELECT * FROM $tableNameAsString"), Row(1, 1, "hr") :: Row(3, null, "hr") :: Nil) - checkDeleteMetrics(numDeletedRows = 1, numCopiedRows = 0) + checkDeleteMetrics(numDeletedRows = 1, numCopiedRows = 0, groupFilterTimeMissing = deltaDelete) } test("delete with conditions on nested columns") { @@ -322,12 +334,12 @@ abstract class DeleteFromTableSuiteBase extends RowLevelOperationSuiteBase { checkAnswer( sql(s"SELECT * FROM $tableNameAsString"), Row(2, 2, Row(2, "v2"), "software") :: Nil) - checkDeleteMetrics(numDeletedRows = 1, numCopiedRows = 0) + checkDeleteMetrics(numDeletedRows = 1, numCopiedRows = 0, groupFilterTimeMissing = deltaDelete) sql(s"DELETE FROM $tableNameAsString t WHERE t.complex.c1 = id") checkAnswer(sql(s"SELECT * FROM $tableNameAsString"), Nil) - checkDeleteMetrics(numDeletedRows = 1, numCopiedRows = 0) + checkDeleteMetrics(numDeletedRows = 1, numCopiedRows = 0, groupFilterTimeMissing = deltaDelete) } test("delete with IN subqueries") { @@ -355,7 +367,8 @@ abstract class DeleteFromTableSuiteBase extends RowLevelOperationSuiteBase { checkAnswer( sql(s"SELECT * FROM $tableNameAsString"), Row(2, 2, "hardware") :: Row(3, null, "hr") :: Nil) - checkDeleteMetrics(numDeletedRows = 1, numCopiedRows = 1) + checkDeleteMetrics( + numDeletedRows = 1, numCopiedRows = 1, groupFilterTimeMissing = deltaDelete) append("pk INT NOT NULL, id INT, dep STRING", """{ "pk": 4, "id": 1, "dep": "hr" } @@ -377,7 +390,8 @@ abstract class DeleteFromTableSuiteBase extends RowLevelOperationSuiteBase { checkAnswer( sql(s"SELECT * FROM $tableNameAsString"), Row(5, -1, "hr") :: Row(4, 1, "hr") :: Nil) - checkDeleteMetrics(numDeletedRows = 2, numCopiedRows = 2) + checkDeleteMetrics( + numDeletedRows = 2, numCopiedRows = 2, groupFilterTimeMissing = deltaDelete) append("pk INT NOT NULL, id INT, dep STRING", """{ "pk": 6, "id": null, "dep": "hr" } @@ -399,7 +413,8 @@ abstract class DeleteFromTableSuiteBase extends RowLevelOperationSuiteBase { checkAnswer( sql(s"SELECT * FROM $tableNameAsString"), Row(5, -1, "hr") :: Row(4, 1, "hr") :: Row(6, null, "hr") :: Nil) - checkDeleteMetrics(numDeletedRows = 1, numCopiedRows = 3) + checkDeleteMetrics( + numDeletedRows = 1, numCopiedRows = 3, groupFilterTimeMissing = deltaDelete) } } @@ -423,7 +438,8 @@ abstract class DeleteFromTableSuiteBase extends RowLevelOperationSuiteBase { checkAnswer( sql(s"SELECT * FROM $tableNameAsString"), Row(2, 2, "hardware") :: Row(3, null, "hr") :: Nil) - checkDeleteMetrics(numDeletedRows = 1, numCopiedRows = 1) + checkDeleteMetrics( + numDeletedRows = 1, numCopiedRows = 1, groupFilterTimeMissing = deltaDelete) } } @@ -456,7 +472,8 @@ abstract class DeleteFromTableSuiteBase extends RowLevelOperationSuiteBase { |""".stripMargin) } - checkDeleteMetrics(numDeletedRows = 2, numCopiedRows = 2) + checkDeleteMetrics( + numDeletedRows = 2, numCopiedRows = 2, groupFilterTimeMissing = deltaDelete) checkAnswer( sql(s"SELECT * FROM $tableNameAsString"), @@ -490,7 +507,8 @@ abstract class DeleteFromTableSuiteBase extends RowLevelOperationSuiteBase { checkAnswer( sql(s"SELECT * FROM $tableNameAsString"), Row(1, 1, "hr") :: Row(2, 2, "hardware") :: Row(3, null, "hr") :: Nil) - checkDeleteMetrics(numDeletedRows = 0, numCopiedRows = 0) + checkDeleteMetrics( + numDeletedRows = 0, numCopiedRows = 0, groupFilterTimeMissing = deltaDelete) sql( s"""DELETE FROM $tableNameAsString @@ -499,7 +517,8 @@ abstract class DeleteFromTableSuiteBase extends RowLevelOperationSuiteBase { |""".stripMargin) checkAnswer(sql(s"SELECT * FROM $tableNameAsString"), Row(3, null, "hr") :: Nil) - checkDeleteMetrics(numDeletedRows = 2, numCopiedRows = 1) + checkDeleteMetrics( + numDeletedRows = 2, numCopiedRows = 1, groupFilterTimeMissing = deltaDelete) append("pk INT NOT NULL, id INT, dep STRING", """{ "pk": 4, "id": 1, "dep": "hr" } @@ -520,7 +539,8 @@ abstract class DeleteFromTableSuiteBase extends RowLevelOperationSuiteBase { |""".stripMargin) checkAnswer(sql(s"SELECT * FROM $tableNameAsString"), Row(5, 2, "hardware") :: Nil) - checkDeleteMetrics(numDeletedRows = 3, numCopiedRows = 0) + checkDeleteMetrics( + numDeletedRows = 3, numCopiedRows = 0, groupFilterTimeMissing = deltaDelete) sql( s"""DELETE FROM $tableNameAsString @@ -567,7 +587,8 @@ abstract class DeleteFromTableSuiteBase extends RowLevelOperationSuiteBase { checkAnswer( sql(s"SELECT * FROM $tableNameAsString"), Row(1, 1, "hr") :: Row(2, 2, "hardware") :: Row(3, null, "hr") :: Nil) - checkDeleteMetrics(numDeletedRows = 0, numCopiedRows = 0) + checkDeleteMetrics( + numDeletedRows = 0, numCopiedRows = 0, groupFilterTimeMissing = deltaDelete) sql( s"""DELETE FROM $tableNameAsString t @@ -578,7 +599,8 @@ abstract class DeleteFromTableSuiteBase extends RowLevelOperationSuiteBase { checkAnswer( sql(s"SELECT * FROM $tableNameAsString"), Row(2, 2, "hardware") :: Row(3, null, "hr") :: Nil) - checkDeleteMetrics(numDeletedRows = 1, numCopiedRows = 1) + checkDeleteMetrics( + numDeletedRows = 1, numCopiedRows = 1, groupFilterTimeMissing = deltaDelete) sql( s"""DELETE FROM $tableNameAsString t @@ -589,7 +611,8 @@ abstract class DeleteFromTableSuiteBase extends RowLevelOperationSuiteBase { checkAnswer( sql(s"SELECT * FROM $tableNameAsString"), Row(2, 2, "hardware") :: Nil) - checkDeleteMetrics(numDeletedRows = 1, numCopiedRows = 0) + checkDeleteMetrics( + numDeletedRows = 1, numCopiedRows = 0, groupFilterTimeMissing = deltaDelete) sql( s"""DELETE FROM $tableNameAsString t @@ -602,7 +625,8 @@ abstract class DeleteFromTableSuiteBase extends RowLevelOperationSuiteBase { checkAnswer( sql(s"SELECT * FROM $tableNameAsString"), Row(2, 2, "hardware") :: Nil) - checkDeleteMetrics(numDeletedRows = 0, numCopiedRows = 0) + checkDeleteMetrics( + numDeletedRows = 0, numCopiedRows = 0, groupFilterTimeMissing = deltaDelete) } } @@ -631,7 +655,8 @@ abstract class DeleteFromTableSuiteBase extends RowLevelOperationSuiteBase { checkAnswer( sql(s"SELECT * FROM $tableNameAsString"), Row(1, 1, "hr") :: Row(3, null, "hr") :: Nil) - checkDeleteMetrics(numDeletedRows = 1, numCopiedRows = 0) + checkDeleteMetrics( + numDeletedRows = 1, numCopiedRows = 0, groupFilterTimeMissing = deltaDelete) sql( s"""DELETE FROM $tableNameAsString t @@ -640,7 +665,8 @@ abstract class DeleteFromTableSuiteBase extends RowLevelOperationSuiteBase { |""".stripMargin) checkAnswer(sql(s"SELECT * FROM $tableNameAsString"), Row(1, 1, "hr") :: Nil) - checkDeleteMetrics(numDeletedRows = 1, numCopiedRows = 1) + checkDeleteMetrics( + numDeletedRows = 1, numCopiedRows = 1, groupFilterTimeMissing = deltaDelete) sql( s"""DELETE FROM $tableNameAsString t @@ -651,7 +677,8 @@ abstract class DeleteFromTableSuiteBase extends RowLevelOperationSuiteBase { |""".stripMargin) checkAnswer(sql(s"SELECT * FROM $tableNameAsString"), Nil) - checkDeleteMetrics(numDeletedRows = 1, numCopiedRows = 0) + checkDeleteMetrics( + numDeletedRows = 1, numCopiedRows = 0, groupFilterTimeMissing = deltaDelete) } } @@ -675,7 +702,8 @@ abstract class DeleteFromTableSuiteBase extends RowLevelOperationSuiteBase { checkAnswer( sql(s"SELECT * FROM $tableNameAsString"), Row(2, 2, "hardware") :: Row(3, null, "hr") :: Nil) - checkDeleteMetrics(numDeletedRows = 1, numCopiedRows = 1) + checkDeleteMetrics( + numDeletedRows = 1, numCopiedRows = 1, groupFilterTimeMissing = deltaDelete) } } @@ -708,7 +736,8 @@ abstract class DeleteFromTableSuiteBase extends RowLevelOperationSuiteBase { checkAnswer( sql(s"SELECT * FROM $tableNameAsString"), Row(3, 2, "hardware") :: Row(4, 3, "hr") :: Nil) - checkDeleteMetrics(numDeletedRows = 2, numCopiedRows = 2) + checkDeleteMetrics( + numDeletedRows = 2, numCopiedRows = 2, groupFilterTimeMissing = deltaDelete) // verify the view reflects the changes in the table checkAnswer(sql("SELECT * FROM temp"), Nil) @@ -754,7 +783,7 @@ abstract class DeleteFromTableSuiteBase extends RowLevelOperationSuiteBase { checkAnswer( sql(s"SELECT * FROM $tableNameAsString"), Row(2, 2, 200) :: Nil) - checkDeleteMetrics(numDeletedRows = 2, numCopiedRows = 0) + checkDeleteMetrics(numDeletedRows = 2, numCopiedRows = 0, groupFilterTimeMissing = deltaDelete) } test("delete with subquery cannot be converted into delete with filters") { @@ -774,7 +803,8 @@ abstract class DeleteFromTableSuiteBase extends RowLevelOperationSuiteBase { checkAnswer( sql(s"SELECT * FROM $tableNameAsString"), Row(2, 2, 200) :: Row(3, 3, 100) :: Nil) - checkDeleteMetrics(numDeletedRows = 1, numCopiedRows = 1) + checkDeleteMetrics( + numDeletedRows = 1, numCopiedRows = 1, groupFilterTimeMissing = deltaDelete) } } @@ -1020,7 +1050,10 @@ abstract class DeleteFromTableSuiteBase extends RowLevelOperationSuiteBase { // The filter gets replaced by an EmptyRelation in the ReplaceData executed plan, which hides // the executed BatchScan and prevents computing numDeletedRows using numOutputRows of the // scan node. - checkDeleteMetrics(numDeletedRows = if (deltaDelete) 3 else -1, numCopiedRows = 0) + checkDeleteMetrics( + numDeletedRows = if (deltaDelete) 3 else -1, + numCopiedRows = 0, + groupFilterTimeMissing = true) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DeltaBasedDeleteFromTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DeltaBasedDeleteFromTableSuite.scala index 15d259d44a4fd..33d39596e2e35 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DeltaBasedDeleteFromTableSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DeltaBasedDeleteFromTableSuite.scala @@ -54,7 +54,7 @@ class DeltaBasedDeleteFromTableSuite extends DeleteFromTableSuiteBase { expectedMetadataSchema = Some(StructType(Array(PARTITION_FIELD, INDEX_FIELD_NULLABLE)))) checkLastWriteLog(deleteWriteLogEntry(id = 1, metadata = Row("hr", null))) - checkDeleteMetrics(numDeletedRows = 1, numCopiedRows = 0) + checkDeleteMetrics(numDeletedRows = 1, numCopiedRows = 0, groupFilterTimeMissing = true) } test("delete with subquery handles metadata columns correctly") { @@ -86,7 +86,7 @@ class DeltaBasedDeleteFromTableSuite extends DeleteFromTableSuiteBase { expectedMetadataSchema = Some(StructType(Array(PARTITION_FIELD, INDEX_FIELD_NULLABLE)))) checkLastWriteLog(deleteWriteLogEntry(id = 1, metadata = Row("hr", null))) - checkDeleteMetrics(numDeletedRows = 1, numCopiedRows = 0) + checkDeleteMetrics(numDeletedRows = 1, numCopiedRows = 0, groupFilterTimeMissing = true) } } @@ -140,7 +140,7 @@ class DeltaBasedDeleteFromTableSuite extends DeleteFromTableSuiteBase { checkAnswer( sql(s"SELECT * FROM $tableNameAsString"), Row(2, 2, "us", "software") :: Row(3, 3, "canada", "hr") :: Nil) - checkDeleteMetrics(numDeletedRows = 1, numCopiedRows = 0) + checkDeleteMetrics(numDeletedRows = 1, numCopiedRows = 0, groupFilterTimeMissing = true) } test("delete does not double plan table") { @@ -167,6 +167,6 @@ class DeltaBasedDeleteFromTableSuite extends DeleteFromTableSuiteBase { checkAnswer( sql(s"SELECT * FROM $tableNameAsString"), Row(2, 2, 150, "software") :: Row(3, 3, 120, "hr") :: Nil) - checkDeleteMetrics(numDeletedRows = 1, numCopiedRows = 0) + checkDeleteMetrics(numDeletedRows = 1, numCopiedRows = 0, groupFilterTimeMissing = true) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DeltaBasedUpdateTableSuiteBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DeltaBasedUpdateTableSuiteBase.scala index 49e586535a0d0..3acf41537a2d3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DeltaBasedUpdateTableSuiteBase.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DeltaBasedUpdateTableSuiteBase.scala @@ -93,6 +93,6 @@ abstract class DeltaBasedUpdateTableSuiteBase extends UpdateTableSuiteBase { sql(s"SELECT * FROM $tableNameAsString"), Row(1, -1, -1, "invalid") :: Row(2, 2, 200, "software") :: Row(3, 3, 300, "hr") :: Nil) - checkUpdateMetrics(numUpdatedRows = 1, numCopiedRows = 0) + checkUpdateMetrics(numUpdatedRows = 1, numCopiedRows = 0, groupFilterTimeMissing = true) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/GroupBasedDeleteFromTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/GroupBasedDeleteFromTableSuite.scala index 3889b0d172adc..2d6be6e3a6c78 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/GroupBasedDeleteFromTableSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/GroupBasedDeleteFromTableSuite.scala @@ -121,6 +121,11 @@ class GroupBasedDeleteFromTableSuite extends DeleteFromTableSuiteBase { checkReplacedPartitions(Seq("software")) checkDeleteMetrics(numDeletedRows = 1, numCopiedRows = 1) + + val summary = getDeleteSummary() + assert(summary.groupFilterTimeMs() > 0, + s"Expected groupFilterTimeMs > 0 when the runtime group filter subquery fires, " + + s"got ${summary.groupFilterTimeMs()}") } } @@ -234,7 +239,7 @@ class GroupBasedDeleteFromTableSuite extends DeleteFromTableSuiteBase { Row(2, 150, "software") :: Row(3, 120, "hr") :: Nil) checkReplacedPartitions(Seq("software", "hr")) - checkDeleteMetrics(numDeletedRows = 1, numCopiedRows = 2) + checkDeleteMetrics(numDeletedRows = 1, numCopiedRows = 2, groupFilterTimeMissing = true) } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/GroupBasedMergeIntoTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/GroupBasedMergeIntoTableSuite.scala index 95936a749270a..e6fb4373c67f5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/GroupBasedMergeIntoTableSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/GroupBasedMergeIntoTableSuite.scala @@ -181,6 +181,11 @@ class GroupBasedMergeIntoTableSuite extends MergeIntoTableSuiteBase { Row(6, 0, "hr"))) // insert checkReplacedPartitions(Seq("hr")) + + val summary = getMergeSummary() + assert(summary.groupFilterTimeMs() > 0, + s"Expected groupFilterTimeMs > 0 when the runtime group filter subquery fires, " + + s"got ${summary.groupFilterTimeMs()}") } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/GroupBasedUpdateTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/GroupBasedUpdateTableSuite.scala index 61defff7ebf07..4259761efb14e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/GroupBasedUpdateTableSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/GroupBasedUpdateTableSuite.scala @@ -118,6 +118,11 @@ class GroupBasedUpdateTableSuite extends UpdateTableSuiteBase { Row(1, -1, "hr") :: Row(2, 150, "software") :: Row(3, 120, "hr") :: Nil) checkReplacedPartitions(Seq("hr")) + + val summary = getUpdateSummary() + assert(summary.groupFilterTimeMs() > 0, + s"Expected groupFilterTimeMs > 0 when the runtime group filter subquery fires, " + + s"got ${summary.groupFilterTimeMs()}") } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/InsertIntoTests.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/InsertIntoTests.scala index 42017c2dd60eb..3185a197024f6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/InsertIntoTests.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/InsertIntoTests.scala @@ -226,6 +226,8 @@ trait InsertIntoSQLOnlyTests val summary = inMemoryTable.commits.last.writeSummary.get.asInstanceOf[InsertSummary] assert(summary.numInsertedRows() === numInsertedRows, s"Expected numInsertedRows=$numInsertedRows, got ${summary.numInsertedRows()}") + assert(summary.executionTimeMs() >= 0, + s"Expected executionTimeMs >= 0, got ${summary.executionTimeMs()}") } protected val v2Format: String diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala index f37a614f99b53..dfb41e217e468 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala @@ -486,7 +486,7 @@ abstract class MergeIntoTableSuiteBase extends RowLevelOperationSuiteBase Row(2, 200, "finance"), // insert Row(3, 300, "hr"))) // insert - val mergeSummary = getMergeSummary() + val mergeSummary = getMergeSummary(groupFilterTimeMissing = true) assert(mergeSummary.numTargetRowsInserted === 3L) assert(mergeSummary.numTargetRowsCopied === 0L) assert(mergeSummary.numTargetRowsUpdated === 0L) @@ -522,7 +522,7 @@ abstract class MergeIntoTableSuiteBase extends RowLevelOperationSuiteBase Row(2, 200, "finance"), // insert Row(3, 300, "hr"))) // insert - val mergeSummary = getMergeSummary() + val mergeSummary = getMergeSummary(groupFilterTimeMissing = true) assert(mergeSummary.numTargetRowsInserted === 2L) assert(mergeSummary.numTargetRowsCopied === 0L) assert(mergeSummary.numTargetRowsUpdated === 0L) @@ -561,7 +561,7 @@ abstract class MergeIntoTableSuiteBase extends RowLevelOperationSuiteBase Row(2, 200, "finance"), // insert Row(3, 300, "hr"))) // insert - val mergeSummary = getMergeSummary() + val mergeSummary = getMergeSummary(groupFilterTimeMissing = true) assert(mergeSummary.numTargetRowsInserted === 3L) assert(mergeSummary.numTargetRowsCopied === 0L) assert(mergeSummary.numTargetRowsUpdated === 0L) @@ -594,7 +594,7 @@ abstract class MergeIntoTableSuiteBase extends RowLevelOperationSuiteBase sql(s"SELECT * FROM $tableNameAsString"), Seq(Row(1, 100, "hr"), Row(2, 200, "hardware"))) - val summary = getMergeSummary() + val summary = getMergeSummary(groupFilterTimeMissing = deltaMerge) assert(summary.numTargetRowsUpdated === 0L) assert(summary.numTargetRowsDeleted === 0L) assert(summary.numTargetRowsInserted === 0L) @@ -660,7 +660,7 @@ abstract class MergeIntoTableSuiteBase extends RowLevelOperationSuiteBase sql(s"SELECT * FROM $tableNameAsString"), Seq(Row(1, 100, "hr"), Row(2, 200, "hardware"))) - val summary = getMergeSummary() + val summary = getMergeSummary(groupFilterTimeMissing = deltaMerge) assert(summary.numTargetRowsUpdated === 0L) assert(summary.numTargetRowsDeleted === 0L) assert(summary.numTargetRowsInserted === 0L) @@ -2414,7 +2414,7 @@ abstract class MergeIntoTableSuiteBase extends RowLevelOperationSuiteBase Row(3, 300, "hr"), Row(5, 400, "executive"))) // inserted - val mergeSummary = getMergeSummary() + val mergeSummary = getMergeSummary(groupFilterTimeMissing = true) assert(mergeSummary.numTargetRowsCopied === 0L) assert(mergeSummary.numTargetRowsInserted === 1L) assert(mergeSummary.numTargetRowsUpdated === 0L) @@ -2468,7 +2468,7 @@ abstract class MergeIntoTableSuiteBase extends RowLevelOperationSuiteBase Row(4, 400, "marketing"), Row(5, -1, "executive"))) // updated - val mergeSummary = getMergeSummary() + val mergeSummary = getMergeSummary(groupFilterTimeMissing = true) assert(mergeSummary.numTargetRowsCopied === (if (deltaMerge) 0L else 3L)) assert(mergeSummary.numTargetRowsInserted === 0L) assert(mergeSummary.numTargetRowsUpdated === 2L) @@ -2524,7 +2524,7 @@ abstract class MergeIntoTableSuiteBase extends RowLevelOperationSuiteBase // Row(5, 500, "executive") deleted ) - val mergeSummary = getMergeSummary() + val mergeSummary = getMergeSummary(groupFilterTimeMissing = true) assert(mergeSummary.numTargetRowsCopied === (if (deltaMerge) 0L else 3L)) assert(mergeSummary.numTargetRowsInserted === 0L) assert(mergeSummary.numTargetRowsUpdated === 0L) @@ -2581,7 +2581,7 @@ abstract class MergeIntoTableSuiteBase extends RowLevelOperationSuiteBase Row(5, -1, "executive"), // updated Row(6, -1, "dummy"))) // inserted - val mergeSummary = getMergeSummary() + val mergeSummary = getMergeSummary(groupFilterTimeMissing = true) assert(mergeSummary.numTargetRowsCopied === (if (deltaMerge) 0L else 3L)) assert(mergeSummary.numTargetRowsInserted === 1L) assert(mergeSummary.numTargetRowsUpdated === 2L) @@ -2638,7 +2638,7 @@ abstract class MergeIntoTableSuiteBase extends RowLevelOperationSuiteBase // Row(5, 500, "executive") deleted Row(6, -1, "dummy"))) // inserted - val mergeSummary = getMergeSummary() + val mergeSummary = getMergeSummary(groupFilterTimeMissing = true) assert(mergeSummary.numTargetRowsCopied === (if (deltaMerge) 0L else 3L)) assert(mergeSummary.numTargetRowsInserted === 1L) assert(mergeSummary.numTargetRowsUpdated === 0L) @@ -2678,7 +2678,7 @@ abstract class MergeIntoTableSuiteBase extends RowLevelOperationSuiteBase |""".stripMargin ) - val mergeMetrics = getMergeSummary() + val mergeMetrics = getMergeSummary(groupFilterTimeMissing = true) assert(mergeMetrics.numTargetRowsCopied === (if (deltaMerge) 0L else 3L)) assert(mergeMetrics.numTargetRowsInserted === 1L) assert(mergeMetrics.numTargetRowsUpdated === 0L) @@ -2794,10 +2794,22 @@ abstract class MergeIntoTableSuiteBase extends RowLevelOperationSuiteBase } } - private def getMergeSummary(): MergeSummary = { + protected def getMergeSummary(groupFilterTimeMissing: Boolean = false): MergeSummary = { val table = catalog.loadTable(ident) - table.asInstanceOf[InMemoryTable].commits.last.writeSummary.get + val summary = table.asInstanceOf[InMemoryTable].commits.last.writeSummary.get .asInstanceOf[MergeSummary] + assert(summary.executionTimeMs() >= 0, + s"Expected executionTimeMs >= 0, got ${summary.executionTimeMs()}") + if (groupFilterTimeMissing) { + assert(summary.groupFilterTimeMs() === -1L, + s"Expected groupFilterTimeMs == -1, got ${summary.groupFilterTimeMs()}") + } else { + assert(summary.groupFilterTimeMs() >= 0, + s"Expected groupFilterTimeMs >= 0, got ${summary.groupFilterTimeMs()}") + } + assert(summary.writeJobTimeMs() >= 0, + s"Expected writeJobTimeMs >= 0, got ${summary.writeJobTimeMs()}") + summary } private def assertNoLeftBroadcastOrReplication(query: String): Unit = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/UpdateTableSuiteBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/UpdateTableSuiteBase.scala index 6e9afe7abc97e..edfd152380229 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/UpdateTableSuiteBase.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/UpdateTableSuiteBase.scala @@ -39,13 +39,25 @@ abstract class UpdateTableSuiteBase extends RowLevelOperationSuiteBase { protected def checkUpdateMetrics( numUpdatedRows: Long, - numCopiedRows: Long): Unit = { + numCopiedRows: Long, + groupFilterTimeMissing: Boolean = false): Unit = { val summary = getUpdateSummary() assert(summary.numUpdatedRows() === numUpdatedRows, s"Expected numUpdatedRows=$numUpdatedRows, got ${summary.numUpdatedRows()}") val expectedCopied = if (deltaUpdate) 0L else numCopiedRows assert(summary.numCopiedRows() === expectedCopied, s"Expected numCopiedRows=$expectedCopied, got ${summary.numCopiedRows()}") + assert(summary.executionTimeMs() >= 0, + s"Expected executionTimeMs >= 0, got ${summary.executionTimeMs()}") + if (groupFilterTimeMissing) { + assert(summary.groupFilterTimeMs() === -1L, + s"Expected groupFilterTimeMs == -1, got ${summary.groupFilterTimeMs()}") + } else { + assert(summary.groupFilterTimeMs() >= 0, + s"Expected groupFilterTimeMs >= 0, got ${summary.groupFilterTimeMs()}") + } + assert(summary.writeJobTimeMs() >= 0, + s"Expected writeJobTimeMs >= 0, got ${summary.writeJobTimeMs()}") } test("update table containing added column with default value") { @@ -157,7 +169,7 @@ abstract class UpdateTableSuiteBase extends RowLevelOperationSuiteBase { checkAnswer(sql(s"SELECT * FROM $tableNameAsString"), Nil) - checkUpdateMetrics(numUpdatedRows = 0, numCopiedRows = 0) + checkUpdateMetrics(numUpdatedRows = 0, numCopiedRows = 0, groupFilterTimeMissing = deltaUpdate) } test("update with basic filters") { @@ -173,7 +185,7 @@ abstract class UpdateTableSuiteBase extends RowLevelOperationSuiteBase { sql(s"SELECT * FROM $tableNameAsString"), Row(1, 100, "invalid") :: Row(2, 200, "software") :: Row(3, 300, "hr") :: Nil) - checkUpdateMetrics(numUpdatedRows = 1, numCopiedRows = 1) + checkUpdateMetrics(numUpdatedRows = 1, numCopiedRows = 1, groupFilterTimeMissing = deltaUpdate) } test("update with aliases") { @@ -236,7 +248,7 @@ abstract class UpdateTableSuiteBase extends RowLevelOperationSuiteBase { sql(s"SELECT * FROM $tableNameAsString"), Row(1, 100, "hr") :: Row(2, 200, "hardware") :: Row(3, null, "hr") :: Nil) - checkUpdateMetrics(numUpdatedRows = 0, numCopiedRows = 0) + checkUpdateMetrics(numUpdatedRows = 0, numCopiedRows = 0, groupFilterTimeMissing = deltaUpdate) } test("update with literal true condition") { @@ -252,7 +264,7 @@ abstract class UpdateTableSuiteBase extends RowLevelOperationSuiteBase { sql(s"SELECT * FROM $tableNameAsString"), Row(1, -1, "hr") :: Row(2, -1, "hardware") :: Row(3, -1, "hr") :: Nil) - checkUpdateMetrics(numUpdatedRows = 3, numCopiedRows = 0) + checkUpdateMetrics(numUpdatedRows = 3, numCopiedRows = 0, groupFilterTimeMissing = true) } test("update without condition") { @@ -268,7 +280,7 @@ abstract class UpdateTableSuiteBase extends RowLevelOperationSuiteBase { sql(s"SELECT * FROM $tableNameAsString"), Row(1, -1, "hr") :: Row(2, -1, "hardware") :: Row(3, -1, "hr") :: Nil) - checkUpdateMetrics(numUpdatedRows = 3, numCopiedRows = 0) + checkUpdateMetrics(numUpdatedRows = 3, numCopiedRows = 0, groupFilterTimeMissing = true) } test("update with NULL conditions on partition columns") { @@ -283,7 +295,7 @@ abstract class UpdateTableSuiteBase extends RowLevelOperationSuiteBase { checkAnswer( sql(s"SELECT * FROM $tableNameAsString"), Row(1, 100, null) :: Row(2, 200, "hr") :: Row(3, 300, "hardware") :: Nil) - checkUpdateMetrics(numUpdatedRows = 0, numCopiedRows = 0) + checkUpdateMetrics(numUpdatedRows = 0, numCopiedRows = 0, groupFilterTimeMissing = deltaUpdate) // should update one matching row with a null-safe condition sql(s"UPDATE $tableNameAsString SET salary = -1 WHERE dep <=> NULL") @@ -305,14 +317,14 @@ abstract class UpdateTableSuiteBase extends RowLevelOperationSuiteBase { checkAnswer( sql(s"SELECT * FROM $tableNameAsString"), Row(1, null, "hr") :: Row(2, 200, "hr") :: Row(3, 300, "hardware") :: Nil) - checkUpdateMetrics(numUpdatedRows = 0, numCopiedRows = 0) + checkUpdateMetrics(numUpdatedRows = 0, numCopiedRows = 0, groupFilterTimeMissing = deltaUpdate) // should update one matching row with a null-safe condition sql(s"UPDATE $tableNameAsString SET dep = 'invalid' WHERE salary <=> NULL") checkAnswer( sql(s"SELECT * FROM $tableNameAsString"), Row(1, null, "invalid") :: Row(2, 200, "hr") :: Row(3, 300, "hardware") :: Nil) - checkUpdateMetrics(numUpdatedRows = 1, numCopiedRows = 1) + checkUpdateMetrics(numUpdatedRows = 1, numCopiedRows = 1, groupFilterTimeMissing = deltaUpdate) } test("update with IN and NOT IN predicates") { @@ -332,7 +344,7 @@ abstract class UpdateTableSuiteBase extends RowLevelOperationSuiteBase { checkAnswer( sql(s"SELECT * FROM $tableNameAsString"), Row(1, -1, "hr") :: Row(2, 200, "hardware") :: Row(3, null, "hr") :: Nil) - checkUpdateMetrics(numUpdatedRows = 0, numCopiedRows = 0) + checkUpdateMetrics(numUpdatedRows = 0, numCopiedRows = 0, groupFilterTimeMissing = deltaUpdate) sql(s"UPDATE $tableNameAsString SET salary = 100 WHERE salary NOT IN (1, 10)") checkAnswer( @@ -394,7 +406,7 @@ abstract class UpdateTableSuiteBase extends RowLevelOperationSuiteBase { checkAnswer( sql(s"SELECT * FROM $tableNameAsString"), Row(1, Row(-1, Row(Seq(-1), Map("k" -> "v"))), "hr") :: Nil) - checkUpdateMetrics(numUpdatedRows = 1, numCopiedRows = 0) + checkUpdateMetrics(numUpdatedRows = 1, numCopiedRows = 0, groupFilterTimeMissing = true) // set primitive, array, map columns to NULL (proper casts should be in inserted) sql(s"UPDATE $tableNameAsString SET s.c1 = NULL, s.c2 = NULL WHERE pk = 1") @@ -411,7 +423,7 @@ abstract class UpdateTableSuiteBase extends RowLevelOperationSuiteBase { checkAnswer( sql(s"SELECT * FROM $tableNameAsString"), Row(1, Row(1, Row(Seq(1), null)), "hr") :: Nil) - checkUpdateMetrics(numUpdatedRows = 1, numCopiedRows = 0) + checkUpdateMetrics(numUpdatedRows = 1, numCopiedRows = 0, groupFilterTimeMissing = true) } test("update fields inside NULL structs") { @@ -521,7 +533,8 @@ abstract class UpdateTableSuiteBase extends RowLevelOperationSuiteBase { checkAnswer( sql(s"SELECT * FROM $tableNameAsString"), Row(1, 1, "invalid") :: Row(2, 2, "invalid") :: Row(3, null, "invalid") :: Nil) - checkUpdateMetrics(numUpdatedRows = 2, numCopiedRows = 0) + checkUpdateMetrics( + numUpdatedRows = 2, numCopiedRows = 0, groupFilterTimeMissing = deltaUpdate) } } @@ -574,7 +587,8 @@ abstract class UpdateTableSuiteBase extends RowLevelOperationSuiteBase { checkAnswer( sql(s"SELECT * FROM $tableNameAsString"), Row(1, 1, "hr") :: Row(2, 2, "hardware") :: Row(3, null, "hr") :: Nil) - checkUpdateMetrics(numUpdatedRows = 0, numCopiedRows = 0) + checkUpdateMetrics( + numUpdatedRows = 0, numCopiedRows = 0, groupFilterTimeMissing = deltaUpdate) sql( s"""UPDATE $tableNameAsString @@ -585,7 +599,8 @@ abstract class UpdateTableSuiteBase extends RowLevelOperationSuiteBase { checkAnswer( sql(s"SELECT * FROM $tableNameAsString"), Row(1, 1, "invalid") :: Row(2, 2, "invalid") :: Row(3, null, "hr") :: Nil) - checkUpdateMetrics(numUpdatedRows = 2, numCopiedRows = 1) + checkUpdateMetrics( + numUpdatedRows = 2, numCopiedRows = 1, groupFilterTimeMissing = deltaUpdate) sql( s"""UPDATE $tableNameAsString @@ -625,7 +640,8 @@ abstract class UpdateTableSuiteBase extends RowLevelOperationSuiteBase { checkAnswer( sql(s"SELECT * FROM $tableNameAsString"), Row(1, 1, "hr") :: Row(2, 2, "hardware") :: Row(3, null, "hr") :: Nil) - checkUpdateMetrics(numUpdatedRows = 0, numCopiedRows = 0) + checkUpdateMetrics( + numUpdatedRows = 0, numCopiedRows = 0, groupFilterTimeMissing = deltaUpdate) sql( s"""UPDATE $tableNameAsString t @@ -636,7 +652,8 @@ abstract class UpdateTableSuiteBase extends RowLevelOperationSuiteBase { checkAnswer( sql(s"SELECT * FROM $tableNameAsString"), Row(1, 1, "invalid") :: Row(2, 2, "hardware") :: Row(3, null, "hr") :: Nil) - checkUpdateMetrics(numUpdatedRows = 1, numCopiedRows = 1) + checkUpdateMetrics( + numUpdatedRows = 1, numCopiedRows = 1, groupFilterTimeMissing = deltaUpdate) sql( s"""UPDATE $tableNameAsString t @@ -647,7 +664,8 @@ abstract class UpdateTableSuiteBase extends RowLevelOperationSuiteBase { checkAnswer( sql(s"SELECT * FROM $tableNameAsString"), Row(1, 1, "invalid") :: Row(2, 2, "hardware") :: Row(3, null, "invalid") :: Nil) - checkUpdateMetrics(numUpdatedRows = 1, numCopiedRows = 0) + checkUpdateMetrics( + numUpdatedRows = 1, numCopiedRows = 0, groupFilterTimeMissing = deltaUpdate) sql( s"""UPDATE $tableNameAsString t @@ -700,7 +718,8 @@ abstract class UpdateTableSuiteBase extends RowLevelOperationSuiteBase { checkAnswer( sql(s"SELECT * FROM $tableNameAsString"), Row(1, 1, "hr") :: Row(2, 2, "invalid") :: Row(3, null, "invalid") :: Nil) - checkUpdateMetrics(numUpdatedRows = 2, numCopiedRows = 1) + checkUpdateMetrics( + numUpdatedRows = 2, numCopiedRows = 1, groupFilterTimeMissing = deltaUpdate) sql( s"""UPDATE $tableNameAsString t @@ -713,7 +732,8 @@ abstract class UpdateTableSuiteBase extends RowLevelOperationSuiteBase { checkAnswer( sql(s"SELECT * FROM $tableNameAsString"), Row(1, 1, "invalid") :: Row(2, 2, "invalid") :: Row(3, null, "invalid") :: Nil) - checkUpdateMetrics(numUpdatedRows = 3, numCopiedRows = 0) + checkUpdateMetrics( + numUpdatedRows = 3, numCopiedRows = 0, groupFilterTimeMissing = deltaUpdate) } } @@ -739,7 +759,8 @@ abstract class UpdateTableSuiteBase extends RowLevelOperationSuiteBase { sql(s"SELECT * FROM $tableNameAsString"), Row(1, 1, "invalid") :: Row(2, 2, "hardware") :: Row(3, null, "hr") :: Nil) - checkUpdateMetrics(numUpdatedRows = 1, numCopiedRows = 1) + checkUpdateMetrics( + numUpdatedRows = 1, numCopiedRows = 1, groupFilterTimeMissing = deltaUpdate) } }