Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,17 @@ public interface DeleteSummary extends WriteSummary {
* Returns the number of rows copied unmodified, or -1 if not found.
*/
long numCopiedRows();

/**
* Returns the time, in milliseconds, spent in the runtime group-filter subquery that identifies
* which target files contain rows matching the delete condition. Returns -1 when no such
* subquery was injected.
*/
long groupFilterTimeMs();

/**
* Returns the wall-clock time, in milliseconds, of the main write job that scans pruned
* target files, applies the delete, and writes output.
*/
long writeJobTimeMs();
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,4 +69,17 @@ public interface MergeSummary extends WriteSummary {
* or -1 if not found.
*/
long numTargetRowsNotMatchedBySourceDeleted();

/**
* Returns the time, in milliseconds, spent in the runtime group-filter subquery that identifies
* which target files contain rows matching the merge condition. Returns -1 when no such
* subquery was injected.
*/
long groupFilterTimeMs();

/**
* Returns the wall-clock time, in milliseconds, of the main write job that scans pruned
* target files, applies the merge logic, and writes output.
*/
long writeJobTimeMs();
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,17 @@ public interface UpdateSummary extends WriteSummary {
* Returns the number of rows copied unmodified, or -1 if not found.
*/
long numCopiedRows();

/**
* Returns the time, in milliseconds, spent in the runtime group-filter subquery that identifies
* which target files contain rows matching the update condition. Returns -1 when no such
* subquery was injected.
*/
long groupFilterTimeMs();

/**
* Returns the wall-clock time, in milliseconds, of the main write job that scans pruned
* target files, applies the updates, and writes output.
*/
long writeJobTimeMs();
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,10 @@
*/
@Evolving
public interface WriteSummary {

/**
* Returns the total time, in milliseconds, taken to execute the write operation up to (but not
* including) the connector commit.
*/
long executionTimeMs();
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ package org.apache.spark.sql.connector.write
*/
private[sql] case class DeleteSummaryImpl(
numDeletedRows: Long,
numCopiedRows: Long)
numCopiedRows: Long,
executionTimeMs: Long,
groupFilterTimeMs: Long,
writeJobTimeMs: Long)
extends DeleteSummary {
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,8 @@ package org.apache.spark.sql.connector.write
/**
* Implementation of [[InsertSummary]] that provides INSERT operation summary.
*/
private[sql] case class InsertSummaryImpl(numInsertedRows: Long) extends InsertSummary {
private[sql] case class InsertSummaryImpl(
numInsertedRows: Long,
executionTimeMs: Long)
extends InsertSummary {
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ private[sql] case class MergeSummaryImpl(
numTargetRowsMatchedUpdated: Long,
numTargetRowsMatchedDeleted: Long,
numTargetRowsNotMatchedBySourceUpdated: Long,
numTargetRowsNotMatchedBySourceDeleted: Long)
numTargetRowsNotMatchedBySourceDeleted: Long,
executionTimeMs: Long,
groupFilterTimeMs: Long,
writeJobTimeMs: Long)
extends MergeSummary {
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ package org.apache.spark.sql.connector.write
*/
private[sql] case class UpdateSummaryImpl(
numUpdatedRows: Long,
numCopiedRows: Long)
numCopiedRows: Long,
executionTimeMs: Long,
groupFilterTimeMs: Long,
writeJobTimeMs: Long)
extends UpdateSummary {
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql.execution.datasources.v2

import java.util.concurrent.TimeUnit

import scala.jdk.CollectionConverters._

import org.apache.spark.{SparkEnv, SparkException, TaskContext}
Expand All @@ -36,7 +38,7 @@ import org.apache.spark.sql.connector.metric.CustomMetric
import org.apache.spark.sql.connector.write.{BatchWrite, DataWriter, DataWriterFactory, DeleteSummaryImpl, DeltaWrite, DeltaWriter, InsertSummaryImpl, MergeSummaryImpl, PhysicalWriteInfoImpl, RowLevelOperation, RowLevelOperationTable, UpdateSummaryImpl, Write, WriterCommitMessage, WriteSummary}
import org.apache.spark.sql.connector.write.RowLevelOperation.Command._
import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors}
import org.apache.spark.sql.execution.{QueryExecution, SparkPlan, SQLExecution, UnaryExecNode}
import org.apache.spark.sql.execution.{ExecSubqueryExpression, QueryExecution, SparkPlan, SQLExecution, UnaryExecNode}
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
import org.apache.spark.sql.execution.metric.{CustomMetrics, SQLLastAttemptMetric, SQLLastAttemptMetrics, SQLMetric, SQLMetrics}
import org.apache.spark.sql.types.StructType
Expand Down Expand Up @@ -296,7 +298,9 @@ case class AppendDataExec(
copy(query = newChild)

override protected def getWriteSummary(): Option[WriteSummary] =
Some(InsertSummaryImpl(numInsertedRows = numOutputRowsMetric.value))
Some(InsertSummaryImpl(
numInsertedRows = numOutputRowsMetric.value,
executionTimeMs = executionTimeMsMetric.value))
}

/**
Expand Down Expand Up @@ -324,7 +328,10 @@ case class InsertOnlyMergeExec(
numTargetRowsMatchedUpdated = 0L,
numTargetRowsMatchedDeleted = 0L,
numTargetRowsNotMatchedBySourceUpdated = 0L,
numTargetRowsNotMatchedBySourceDeleted = 0L))
numTargetRowsNotMatchedBySourceDeleted = 0L,
executionTimeMs = executionTimeMsMetric.value,
groupFilterTimeMs = groupFilterTimeMs,
writeJobTimeMs = writeJobTimeMsMetric.value))
}

/**
Expand Down Expand Up @@ -558,20 +565,29 @@ trait RowLevelWriteExec extends V2ExistingTableWriteExec {
getMetricValue(metrics, "numTargetRowsMatchedUpdated"),
getMetricValue(metrics, "numTargetRowsMatchedDeleted"),
getMetricValue(metrics, "numTargetRowsNotMatchedBySourceUpdated"),
getMetricValue(metrics, "numTargetRowsNotMatchedBySourceDeleted"))
getMetricValue(metrics, "numTargetRowsNotMatchedBySourceDeleted"),
executionTimeMsMetric.value,
groupFilterTimeMs,
writeJobTimeMsMetric.value)
}
}

protected def getUpdateSummary(): Option[UpdateSummaryImpl] = {
Some(UpdateSummaryImpl(
getMetricValue(sparkMetrics, "numUpdatedRows"),
getMetricValue(sparkMetrics, "numCopiedRows")))
getMetricValue(sparkMetrics, "numCopiedRows"),
executionTimeMsMetric.value,
groupFilterTimeMs,
writeJobTimeMsMetric.value))
}

protected def getDeleteSummary(): Option[DeleteSummaryImpl] = {
Some(DeleteSummaryImpl(
getMetricValue(sparkMetrics, "numDeletedRows"),
getMetricValue(sparkMetrics, "numCopiedRows")))
getMetricValue(sparkMetrics, "numCopiedRows"),
executionTimeMsMetric.value,
groupFilterTimeMs,
writeJobTimeMsMetric.value))
}
}

Expand All @@ -596,11 +612,24 @@ trait V2TableWriteExec

protected lazy val numOutputRowsMetric: SQLMetric =
SQLMetrics.createMetric(sparkContext, "number of output rows")
protected lazy val executionTimeMsMetric: SQLMetric =
SQLMetrics.createTimingMetric(sparkContext, "execution time")
protected lazy val groupFilterTimeMsMetric: SQLMetric =
SQLMetrics.createTimingMetric(sparkContext, "group filter time")
protected lazy val writeJobTimeMsMetric: SQLMetric =
SQLMetrics.createTimingMetric(sparkContext, "write job time")

// Backing value for `groupFilterTimeMs` in the WriteSummary, which can be -1 unlike SQLMetric.
protected var groupFilterTimeMs: Long = -1L

override protected def sparkMetrics: Map[String, SQLMetric] = Map(
"numOutputRows" -> numOutputRowsMetric)
"numOutputRows" -> numOutputRowsMetric,
"executionTimeMs" -> executionTimeMsMetric,
"groupFilterTimeMs" -> groupFilterTimeMsMetric,
"writeJobTimeMs" -> writeJobTimeMsMetric)

protected def writeWithV2(batchWrite: BatchWrite): Seq[InternalRow] = {
val startNanos = System.nanoTime()
val rdd: RDD[InternalRow] = {
val tempRdd = query.execute()
// SPARK-23271 If we are attempting to write a zero partition rdd, create a dummy single
Expand All @@ -626,6 +655,7 @@ trait V2TableWriteExec
val writeMetrics: Map[String, SQLMetric] = customMetrics

try {
val rewriteStartNanos = System.nanoTime()
sparkContext.runJob(
rdd,
(context: TaskContext, iter: Iterator[InternalRow]) =>
Expand All @@ -638,9 +668,21 @@ trait V2TableWriteExec
batchWrite.onDataWriterCommit(commitMessage)
}
)

executionTimeMsMetric.set(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos))
writeJobTimeMsMetric.set(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - rewriteStartNanos))
groupFilterTimeMs = collectRowLevelGroupFilterTimeMs(query)
// SQLMetric.set ignores negative values, so the metric stays unset (and reads 0 in the UI)
// when no subquery was injected.
groupFilterTimeMsMetric.set(groupFilterTimeMs)
val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
SQLMetrics.postDriverMetricUpdates(sparkContext, executionId, Seq(numOutputRowsMetric))
SQLMetrics.postDriverMetricUpdates(
sparkContext,
executionId,
Seq(
numOutputRowsMetric,
executionTimeMsMetric,
groupFilterTimeMsMetric,
writeJobTimeMsMetric))

val writeSummary = getWriteSummary()
logInfo(log"Data source write support ${MDC(LogKeys.BATCH_WRITE, batchWrite)} is committing.")
Expand Down Expand Up @@ -671,6 +713,27 @@ trait V2TableWriteExec
}

protected def getWriteSummary(): Option[WriteSummary] = None

/**
* Sums the `collectTime` of subqueries injected by `RowLevelOperationRuntimeGroupFiltering`
* to identify which target files contain rows matching the row-level operation condition.
* Returns -1 when no such subquery was injected.
*/
private def collectRowLevelGroupFilterTimeMs(plan: SparkPlan): Long = {
var found = false
var totalMs = 0L
foreach(plan) {
case b: BatchScanExec if b.table.isInstanceOf[RowLevelOperationTable] =>
b.runtimeFilters.foreach(_.foreach {
case sub: ExecSubqueryExpression =>
found = true
sub.plan.metrics.get("collectTime").foreach(m => totalMs += m.value)
case _ =>
})
case _ =>
}
if (found) totalMs else -1L
}
}

trait WritingSparkTask[W <: DataWriter[InternalRow]] extends Logging with Serializable {
Expand Down
Loading