diff --git a/backends-velox/src/main/java/org/apache/gluten/metrics/Metrics.java b/backends-velox/src/main/java/org/apache/gluten/metrics/Metrics.java index 12f38d84e5e..ce5bec16a7f 100644 --- a/backends-velox/src/main/java/org/apache/gluten/metrics/Metrics.java +++ b/backends-velox/src/main/java/org/apache/gluten/metrics/Metrics.java @@ -16,209 +16,21 @@ */ package org.apache.gluten.metrics; -import org.apache.gluten.exception.GlutenException; - public class Metrics implements IMetrics { - public long[] inputRows; - public long[] inputVectors; - public long[] inputBytes; - public long[] rawInputRows; - public long[] rawInputBytes; - public long[] outputRows; - public long[] outputVectors; - public long[] outputBytes; - public long[] cpuCount; - public long[] wallNanos; - public long[] scanTime; - public long[] peakMemoryBytes; - public long[] numMemoryAllocations; - public long[] spilledInputBytes; - public long[] spilledBytes; - public long[] spilledRows; - public long[] spilledPartitions; - public long[] spilledFiles; - public long[] numDynamicFiltersProduced; - public long[] numDynamicFiltersAccepted; - public long[] numReplacedWithDynamicFilterRows; - public long[] numDynamicFilterInputRows; - public long[] flushRowCount; - public long[] abandonedPartialAggregationRows; - public long[] loadedToValueHook; - public long[] bloomFilterBlocksByteSize; - public long[] skippedSplits; - public long[] processedSplits; - public long[] skippedStrides; - public long[] processedStrides; - public long[] remainingFilterTime; - public long[] ioWaitTime; - public long[] storageReadBytes; - public long[] storageReads; - public long[] localReadBytes; - public long[] ramReadBytes; - public long[] preloadSplits; - public long[] pageLoadTime; - public long[] dataSourceAddSplitTime; - public long[] dataSourceReadTime; - - public long[] physicalWrittenBytes; - public long[] writeIOTime; - public long[] numWrittenFiles; - - public long[] loadLazyVectorTime; - - public SingleMetric singleMetric = new SingleMetric(); + public final String metricsJson; + public final int numMetrics; + public final SingleMetric singleMetric = new SingleMetric(); public String taskStats; /** Create an instance for native metrics. */ - public Metrics( - long[] inputRows, - long[] inputVectors, - long[] inputBytes, - long[] rawInputRows, - long[] rawInputBytes, - long[] outputRows, - long[] outputVectors, - long[] outputBytes, - long[] cpuCount, - long[] wallNanos, - long veloxToArrow, - long[] peakMemoryBytes, - long[] numMemoryAllocations, - long[] spilledInputBytes, - long[] spilledBytes, - long[] spilledRows, - long[] spilledPartitions, - long[] spilledFiles, - long[] numDynamicFiltersProduced, - long[] numDynamicFiltersAccepted, - long[] numReplacedWithDynamicFilterRows, - long[] numDynamicFilterInputRows, - long[] flushRowCount, - long[] abandonedPartialAggregationRows, - long[] loadedToValueHook, - long[] bloomFilterBlocksByteSize, - long[] scanTime, - long[] skippedSplits, - long[] processedSplits, - long[] skippedStrides, - long[] processedStrides, - long[] remainingFilterTime, - long[] ioWaitTime, - long[] storageReadBytes, - long[] storageReads, - long[] localReadBytes, - long[] ramReadBytes, - long[] preloadSplits, - long[] pageLoadTime, - long[] dataSourceAddSplitTime, - long[] dataSourceReadTime, - long[] physicalWrittenBytes, - long[] writeIOTime, - long[] numWrittenFiles, - long[] loadLazyVectorTime, - String taskStats) { - this.inputRows = inputRows; - this.inputVectors = inputVectors; - this.inputBytes = inputBytes; - this.rawInputRows = rawInputRows; - this.rawInputBytes = rawInputBytes; - this.outputRows = outputRows; - this.outputVectors = outputVectors; - this.outputBytes = outputBytes; - this.cpuCount = cpuCount; - this.wallNanos = wallNanos; - this.scanTime = scanTime; + public Metrics(String metricsJson, int numMetrics, long veloxToArrow, String taskStats) { + this.metricsJson = metricsJson; + this.numMetrics = numMetrics; this.singleMetric.veloxToArrow = veloxToArrow; - this.peakMemoryBytes = peakMemoryBytes; - this.numMemoryAllocations = numMemoryAllocations; - this.spilledInputBytes = spilledInputBytes; - this.spilledBytes = spilledBytes; - this.spilledRows = spilledRows; - this.spilledPartitions = spilledPartitions; - this.spilledFiles = spilledFiles; - this.numDynamicFiltersProduced = numDynamicFiltersProduced; - this.numDynamicFiltersAccepted = numDynamicFiltersAccepted; - this.numReplacedWithDynamicFilterRows = numReplacedWithDynamicFilterRows; - this.numDynamicFilterInputRows = numDynamicFilterInputRows; - this.flushRowCount = flushRowCount; - this.abandonedPartialAggregationRows = abandonedPartialAggregationRows; - this.loadedToValueHook = loadedToValueHook; - this.bloomFilterBlocksByteSize = bloomFilterBlocksByteSize; - this.skippedSplits = skippedSplits; - this.processedSplits = processedSplits; - this.skippedStrides = skippedStrides; - this.processedStrides = processedStrides; - this.remainingFilterTime = remainingFilterTime; - this.ioWaitTime = ioWaitTime; - this.storageReadBytes = storageReadBytes; - this.storageReads = storageReads; - this.localReadBytes = localReadBytes; - this.ramReadBytes = ramReadBytes; - this.preloadSplits = preloadSplits; - this.pageLoadTime = pageLoadTime; - this.dataSourceAddSplitTime = dataSourceAddSplitTime; - this.dataSourceReadTime = dataSourceReadTime; - - this.physicalWrittenBytes = physicalWrittenBytes; - this.writeIOTime = writeIOTime; - this.numWrittenFiles = numWrittenFiles; - this.loadLazyVectorTime = loadLazyVectorTime; this.taskStats = taskStats; } - public OperatorMetrics getOperatorMetrics(int index) { - if (index >= inputRows.length) { - throw new GlutenException("Invalid index."); - } - - return new OperatorMetrics( - inputRows[index], - inputVectors[index], - inputBytes[index], - rawInputRows[index], - rawInputBytes[index], - outputRows[index], - outputVectors[index], - outputBytes[index], - cpuCount[index], - wallNanos[index], - peakMemoryBytes[index], - numMemoryAllocations[index], - spilledInputBytes[index], - spilledBytes[index], - spilledRows[index], - spilledPartitions[index], - spilledFiles[index], - numDynamicFiltersProduced[index], - numDynamicFiltersAccepted[index], - numReplacedWithDynamicFilterRows[index], - numDynamicFilterInputRows[index], - flushRowCount[index], - abandonedPartialAggregationRows[index], - loadedToValueHook[index], - bloomFilterBlocksByteSize[index], - scanTime[index], - skippedSplits[index], - processedSplits[index], - skippedStrides[index], - processedStrides[index], - remainingFilterTime[index], - ioWaitTime[index], - storageReadBytes[index], - storageReads[index], - localReadBytes[index], - ramReadBytes[index], - preloadSplits[index], - pageLoadTime[index], - dataSourceAddSplitTime[index], - dataSourceReadTime[index], - physicalWrittenBytes[index], - writeIOTime[index], - numWrittenFiles[index], - loadLazyVectorTime[index]); - } - public SingleMetric getSingleMetrics() { return singleMetric; } diff --git a/backends-velox/src/main/java/org/apache/gluten/metrics/OperatorMetrics.java b/backends-velox/src/main/java/org/apache/gluten/metrics/OperatorMetrics.java index 18feed0b9e1..bf541677116 100644 --- a/backends-velox/src/main/java/org/apache/gluten/metrics/OperatorMetrics.java +++ b/backends-velox/src/main/java/org/apache/gluten/metrics/OperatorMetrics.java @@ -64,6 +64,9 @@ public class OperatorMetrics implements IOperatorMetrics { public long loadLazyVectorTime; + /** Create an empty instance for operator metrics. */ + public OperatorMetrics() {} + /** Create an instance for operator metrics. */ public OperatorMetrics( long inputRows, diff --git a/backends-velox/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala b/backends-velox/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala index 24930ea153e..db97cef7253 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala @@ -16,6 +16,7 @@ */ package org.apache.gluten.metrics +import org.apache.gluten.exception.GlutenException import org.apache.gluten.execution._ import org.apache.gluten.substrait.{AggregationParams, JoinParams} @@ -23,10 +24,124 @@ import org.apache.spark.internal.Logging import org.apache.spark.metrics.TaskStatsAccumulator import org.apache.spark.sql.execution.SparkPlan +import com.fasterxml.jackson.databind.{JsonNode, ObjectMapper} + import java.lang.{Long => JLong} import java.util.{ArrayList => JArrayList, List => JList, Map => JMap} +import scala.collection.JavaConverters._ + object MetricsUtil extends Logging { + private val objectMapper = new ObjectMapper() + + private def value(node: JsonNode, fieldName: String): Long = + Option(node.get(fieldName)).map(_.asLong()).getOrElse(0L) + + private def customMetricSum(node: JsonNode, metricName: String): Long = + Option(node.get("customStats")) + .flatMap(customStats => Option(customStats.get(metricName))) + .flatMap(metric => Option(metric.get("sum"))) + .map(_.asLong()) + .getOrElse(0L) + + private def customMetricCount(node: JsonNode, metricName: String): Long = + Option(node.get("customStats")) + .flatMap(customStats => Option(customStats.get(metricName))) + .flatMap(metric => Option(metric.get("count"))) + .map(_.asLong()) + .getOrElse(0L) + + private def operatorMetricFromJson(node: JsonNode): OperatorMetrics = { + val metrics = new OperatorMetrics() + metrics.inputRows = value(node, "inputRows") + metrics.inputVectors = value(node, "inputVectors") + metrics.inputBytes = value(node, "inputBytes") + metrics.rawInputRows = value(node, "rawInputRows") + metrics.rawInputBytes = value(node, "rawInputBytes") + metrics.outputRows = value(node, "outputRows") + metrics.outputVectors = value(node, "outputVectors") + metrics.outputBytes = value(node, "outputBytes") + metrics.cpuCount = value(node, "cpuCount") + metrics.wallNanos = value(node, "wallNanos") + metrics.peakMemoryBytes = value(node, "peakMemoryBytes") + metrics.numMemoryAllocations = value(node, "numMemoryAllocations") + metrics.spilledInputBytes = value(node, "spilledInputBytes") + metrics.spilledBytes = value(node, "spilledBytes") + metrics.spilledRows = value(node, "spilledRows") + metrics.spilledPartitions = value(node, "spilledPartitions") + metrics.spilledFiles = value(node, "spilledFiles") + metrics.numDynamicFiltersProduced = customMetricSum(node, "dynamicFiltersProduced") + metrics.numDynamicFiltersAccepted = customMetricSum(node, "dynamicFiltersAccepted") + metrics.numReplacedWithDynamicFilterRows = + customMetricSum(node, "replacedWithDynamicFilterRows") + metrics.numDynamicFilterInputRows = customMetricSum(node, "dynamicFilterInputRows") + metrics.flushRowCount = customMetricSum(node, "flushRowCount") + metrics.abandonedPartialAggregationRows = + customMetricSum(node, "abandonedPartialAggregationRows") + metrics.loadedToValueHook = customMetricSum(node, "loadedToValueHook") + metrics.bloomFilterBlocksByteSize = customMetricSum(node, "bloomFilterSize") + metrics.scanTime = customMetricSum(node, "totalScanTime") + metrics.skippedSplits = customMetricSum(node, "skippedSplits") + metrics.processedSplits = customMetricSum(node, "processedSplits") + metrics.skippedStrides = customMetricSum(node, "skippedStrides") + metrics.processedStrides = customMetricSum(node, "processedStrides") + metrics.remainingFilterTime = customMetricSum(node, "totalRemainingFilterWallNanos") + metrics.ioWaitTime = customMetricSum(node, "ioWaitWallNanos") + metrics.storageReadBytes = customMetricSum(node, "storageReadBytes") + metrics.storageReads = customMetricCount(node, "storageReadBytes") + metrics.localReadBytes = customMetricSum(node, "localReadBytes") + metrics.ramReadBytes = customMetricSum(node, "ramReadBytes") + metrics.preloadSplits = customMetricSum(node, "readyPreloadedSplits") + metrics.pageLoadTime = customMetricSum(node, "pageLoadTimeNs") + metrics.dataSourceAddSplitTime = customMetricSum(node, "dataSourceAddSplitWallNanos") + + customMetricSum(node, "waitForPreloadSplitNanos") + metrics.dataSourceReadTime = customMetricSum(node, "dataSourceReadWallNanos") + metrics.physicalWrittenBytes = value(node, "physicalWrittenBytes") + metrics.writeIOTime = customMetricSum(node, "writeIOWallNanos") + metrics.numWrittenFiles = customMetricSum(node, "numWrittenFiles") + metrics + } + + private def parseNativeOperatorMetrics(metrics: Metrics): JArrayList[OperatorMetrics] = { + val operatorMetrics = new JArrayList[OperatorMetrics]() + if (metrics.numMetrics == 0 || metrics.metricsJson == null || metrics.metricsJson.isEmpty) { + return operatorMetrics + } + + val root = objectMapper.readTree(metrics.metricsJson) + val omittedNodeIds = root.path("omittedNodeIds").elements().asScala.map(_.asText()).toSet + val nodeStats = root.path("nodeStats") + val orderedNodeIds = root.path("orderedNodeIds") + orderedNodeIds.elements().asScala.foreach { + nodeIdNode => + val nodeId = nodeIdNode.asText() + val node = nodeStats.get(nodeId) + if (node == null || node.isNull || node.isMissingNode) { + if (omittedNodeIds.contains(nodeId)) { + operatorMetrics.add(new OperatorMetrics()) + } else { + throw new GlutenException(s"Node id cannot be found in native metrics: $nodeId.") + } + } else { + node.path("operatorStats").elements().asScala.foreach { + opStats => operatorMetrics.add(operatorMetricFromJson(opStats)) + } + } + } + + val loadLazyVectorMetricsIdx = orderedNodeIds.size() - 1 + if (loadLazyVectorMetricsIdx >= 0 && loadLazyVectorMetricsIdx < operatorMetrics.size()) { + operatorMetrics.get(loadLazyVectorMetricsIdx).loadLazyVectorTime = + root.path("loadLazyVectorTime").asLong(0L) + } + + if (operatorMetrics.size() != metrics.numMetrics) { + throw new GlutenException( + s"Unexpected native metrics size. Expected ${metrics.numMetrics}, " + + s"got ${operatorMetrics.size()}.") + } + operatorMetrics + } /** * Generate the function which updates metrics fetched from certain iterator to transformers. @@ -240,7 +355,8 @@ object MetricsUtil extends Logging { mutNode: MetricsUpdaterTree, relMap: JMap[JLong, JList[JLong]], operatorIdx: JLong, - metrics: Metrics, + nativeMetrics: JList[OperatorMetrics], + singleMetrics: Metrics.SingleMetric, metricsIdx: Int, joinParamsMap: JMap[JLong, JoinParams], aggParamsMap: JMap[JLong, AggregationParams]): (JLong, Int) = { @@ -253,7 +369,7 @@ object MetricsUtil extends Logging { .get(operatorIdx) .forEach( _ => { - operatorMetrics.add(metrics.getOperatorMetrics(curMetricsIdx)) + operatorMetrics.add(nativeMetrics.get(curMetricsIdx)) curMetricsIdx -= 1 }) @@ -264,22 +380,22 @@ object MetricsUtil extends Logging { p.postProjectionNeeded = false p } - smj.updateJoinMetrics(operatorMetrics, metrics.getSingleMetrics, joinParams) + smj.updateJoinMetrics(operatorMetrics, singleMetrics, joinParams) case ju: JoinMetricsUpdaterBase => // JoinRel and CrossRel output two suites of metrics respectively for build and probe. // Therefore, fetch one more suite of metrics here. - operatorMetrics.add(metrics.getOperatorMetrics(curMetricsIdx)) + operatorMetrics.add(nativeMetrics.get(curMetricsIdx)) curMetricsIdx -= 1 val joinParams = Option(joinParamsMap.get(operatorIdx)).getOrElse { val p = JoinParams() p.postProjectionNeeded = false p } - ju.updateJoinMetrics(operatorMetrics, metrics.getSingleMetrics, joinParams) + ju.updateJoinMetrics(operatorMetrics, singleMetrics, joinParams) case u: UnionMetricsUpdater => // JoinRel outputs two suites of metrics respectively for hash build and hash probe. // Therefore, fetch one more suite of metrics here. - operatorMetrics.add(metrics.getOperatorMetrics(curMetricsIdx)) + operatorMetrics.add(nativeMetrics.get(curMetricsIdx)) curMetricsIdx -= 1 u.updateUnionMetrics(operatorMetrics) case hau: HashAggregateMetricsUpdater => @@ -315,7 +431,8 @@ object MetricsUtil extends Logging { child, relMap, newOperatorIdx, - metrics, + nativeMetrics, + singleMetrics, newMetricsIdx, joinParamsMap, aggParamsMap) @@ -354,7 +471,8 @@ object MetricsUtil extends Logging { taskStatsAccumulator: TaskStatsAccumulator): IMetrics => Unit = { imetrics => val metrics = imetrics.asInstanceOf[Metrics] - val numNativeMetrics = metrics.inputRows.length + val nativeMetrics = parseNativeOperatorMetrics(metrics) + val numNativeMetrics = nativeMetrics.size() if (numNativeMetrics == 0) { () } else { @@ -362,7 +480,8 @@ object MetricsUtil extends Logging { mutNode, relMap, operatorIdx, - metrics, + nativeMetrics, + metrics.getSingleMetrics, numNativeMetrics - 1, joinParamsMap, aggParamsMap) diff --git a/cpp/core/jni/JniWrapper.cc b/cpp/core/jni/JniWrapper.cc index a726d3be916..fbe6b09a322 100644 --- a/cpp/core/jni/JniWrapper.cc +++ b/cpp/core/jni/JniWrapper.cc @@ -273,11 +273,8 @@ jint JNI_OnLoad(JavaVM* vm, void* reserved) { metricsBuilderClass = createGlobalClassReferenceOrError(env, "Lorg/apache/gluten/metrics/Metrics;"); - metricsBuilderConstructor = getMethodIdOrError( - env, - metricsBuilderClass, - "", - "([J[J[J[J[J[J[J[J[J[JJ[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[JLjava/lang/String;)V"); + metricsBuilderConstructor = + getMethodIdOrError(env, metricsBuilderClass, "", "(Ljava/lang/String;IJLjava/lang/String;)V"); nativeColumnarToRowInfoClass = createGlobalClassReferenceOrError(env, "Lorg/apache/gluten/vectorized/NativeColumnarToRowInfo;"); @@ -305,6 +302,7 @@ void JNI_OnUnload(JavaVM* vm, void* reserved) { env->DeleteGlobalRef(splitResultClass); env->DeleteGlobalRef(nativeColumnarToRowInfoClass); env->DeleteGlobalRef(byteArrayClass); + env->DeleteGlobalRef(metricsBuilderClass); env->DeleteGlobalRef(jniUnsafeByteBufferClass); env->DeleteGlobalRef(shuffleReaderMetricsClass); @@ -561,63 +559,15 @@ JNIEXPORT jobject JNICALL Java_org_apache_gluten_metrics_IteratorMetricsJniWrapp numMetrics = metrics->numMetrics; } - jlongArray longArray[Metrics::kNum]; - for (auto i = static_cast(Metrics::kBegin); i != static_cast(Metrics::kEnd); ++i) { - longArray[i] = env->NewLongArray(numMetrics); - if (metrics) { - env->SetLongArrayRegion(longArray[i], 0, numMetrics, metrics->get((Metrics::TYPE)i)); - } - } - + jstring metricsJson = env->NewStringUTF(metrics ? metrics->json.c_str() : ""); + jstring taskStats = metrics && metrics->stats.has_value() ? env->NewStringUTF(metrics->stats->c_str()) : nullptr; return env->NewObject( metricsBuilderClass, metricsBuilderConstructor, - longArray[Metrics::kInputRows], - longArray[Metrics::kInputVectors], - longArray[Metrics::kInputBytes], - longArray[Metrics::kRawInputRows], - longArray[Metrics::kRawInputBytes], - longArray[Metrics::kOutputRows], - longArray[Metrics::kOutputVectors], - longArray[Metrics::kOutputBytes], - longArray[Metrics::kCpuCount], - longArray[Metrics::kWallNanos], + metricsJson, + static_cast(numMetrics), metrics ? metrics->veloxToArrow : -1, - longArray[Metrics::kPeakMemoryBytes], - longArray[Metrics::kNumMemoryAllocations], - longArray[Metrics::kSpilledInputBytes], - longArray[Metrics::kSpilledBytes], - longArray[Metrics::kSpilledRows], - longArray[Metrics::kSpilledPartitions], - longArray[Metrics::kSpilledFiles], - longArray[Metrics::kNumDynamicFiltersProduced], - longArray[Metrics::kNumDynamicFiltersAccepted], - longArray[Metrics::kNumReplacedWithDynamicFilterRows], - longArray[Metrics::kNumDynamicFilterInputRows], - longArray[Metrics::kFlushRowCount], - longArray[Metrics::kAbandonedPartialAggregationRows], - longArray[Metrics::kLoadedToValueHook], - longArray[Metrics::kBloomFilterBlocksByteSize], - longArray[Metrics::kScanTime], - longArray[Metrics::kSkippedSplits], - longArray[Metrics::kProcessedSplits], - longArray[Metrics::kSkippedStrides], - longArray[Metrics::kProcessedStrides], - longArray[Metrics::kRemainingFilterTime], - longArray[Metrics::kIoWaitTime], - longArray[Metrics::kStorageReadBytes], - longArray[Metrics::kStorageReads], - longArray[Metrics::kLocalReadBytes], - longArray[Metrics::kRamReadBytes], - longArray[Metrics::kPreloadSplits], - longArray[Metrics::kPageLoadTime], - longArray[Metrics::kDataSourceAddSplitWallNanos], - longArray[Metrics::kDataSourceReadWallNanos], - longArray[Metrics::kPhysicalWrittenBytes], - longArray[Metrics::kWriteIOTime], - longArray[Metrics::kNumWrittenFiles], - longArray[Metrics::kLoadLazyVectorTime], - metrics && metrics->stats.has_value() ? env->NewStringUTF(metrics->stats->c_str()) : nullptr); + taskStats); JNI_METHOD_END(nullptr) } diff --git a/cpp/core/utils/Metrics.h b/cpp/core/utils/Metrics.h index 67c0b485c72..f6e0365f97f 100644 --- a/cpp/core/utils/Metrics.h +++ b/cpp/core/utils/Metrics.h @@ -17,7 +17,9 @@ #pragma once -#include +#include +#include +#include namespace gluten { @@ -25,96 +27,17 @@ struct Metrics { unsigned int numMetrics = 0; long veloxToArrow = 0; - // The underlying memory buffer. - std::unique_ptr array; - - // Point to array.get() after the above unique_ptr created. - long* arrayRawPtr = nullptr; - + // Structured metrics payload produced by the backend and decoded on JVM side. + std::string json; // Optional stats string. std::optional stats = std::nullopt; - enum TYPE { - // Begin from 0. - kBegin = 0, - - kInputRows = kBegin, - kInputVectors, - kInputBytes, - - kRawInputRows, - kRawInputBytes, - - kOutputRows, - kOutputVectors, - kOutputBytes, - - // CpuWallTiming. - kCpuCount, - kWallNanos, - - kPeakMemoryBytes, - kNumMemoryAllocations, - - // Spill. - kSpilledInputBytes, - kSpilledBytes, - kSpilledRows, - kSpilledPartitions, - kSpilledFiles, - - // Runtime metrics. - kNumDynamicFiltersProduced, - kNumDynamicFiltersAccepted, - kNumReplacedWithDynamicFilterRows, - kNumDynamicFilterInputRows, - kFlushRowCount, - kAbandonedPartialAggregationRows, - kLoadedToValueHook, - kBloomFilterBlocksByteSize, - kScanTime, - kSkippedSplits, - kProcessedSplits, - kSkippedStrides, - kProcessedStrides, - kRemainingFilterTime, - kIoWaitTime, - kStorageReadBytes, - kStorageReads, - kLocalReadBytes, - kRamReadBytes, - kPreloadSplits, - kPageLoadTime, - kDataSourceAddSplitWallNanos, - kDataSourceReadWallNanos, - - // Write metrics. - kPhysicalWrittenBytes, - kWriteIOTime, - kNumWrittenFiles, - - // Load lazy vector. - kLoadLazyVectorTime, - - // The end of enum items. - kEnd, - kNum = kEnd - kBegin - }; - - Metrics(unsigned int numMetrics) : numMetrics(numMetrics), array(new long[numMetrics * kNum]) { - arrayRawPtr = array.get(); - } + Metrics(unsigned int numMetrics, std::string json) : numMetrics(numMetrics), json(std::move(json)) {} Metrics(const Metrics&) = delete; Metrics(Metrics&&) = delete; Metrics& operator=(const Metrics&) = delete; Metrics& operator=(Metrics&&) = delete; - - long* get(TYPE type) { - assert(static_cast(type) >= static_cast(kBegin) && static_cast(type) < static_cast(kEnd)); - auto offset = (static_cast(type) - static_cast(kBegin)) * numMetrics; - return &arrayRawPtr[offset]; - } }; } // namespace gluten diff --git a/cpp/core/utils/tac/ffor.hpp b/cpp/core/utils/tac/ffor.hpp index 761a1ec2623..0d632efff5a 100644 --- a/cpp/core/utils/tac/ffor.hpp +++ b/cpp/core/utils/tac/ffor.hpp @@ -437,8 +437,7 @@ inline size_t decompress64Impl(const uint8_t* input, size_t inputSize, uint64_t* if (bw == kBwTailMarker) { if (count > 0) { // memcpy handles any alignment, no special case needed. - std::memcpy( - reinterpret_cast(output) + nDecoded * sizeof(uint64_t), inPtr, count * sizeof(uint64_t)); + std::memcpy(reinterpret_cast(output) + nDecoded * sizeof(uint64_t), inPtr, count * sizeof(uint64_t)); nDecoded += count; } break; diff --git a/cpp/velox/compute/WholeStageResultIterator.cc b/cpp/velox/compute/WholeStageResultIterator.cc index c3ac095cdc7..d39d82739c9 100644 --- a/cpp/velox/compute/WholeStageResultIterator.cc +++ b/cpp/velox/compute/WholeStageResultIterator.cc @@ -15,6 +15,7 @@ * limitations under the License. */ #include "WholeStageResultIterator.h" +#include #include "VeloxBackend.h" #include "VeloxPlanConverter.h" #include "VeloxRuntime.h" @@ -40,33 +41,6 @@ namespace gluten { namespace { -// metrics -const std::string kDynamicFiltersProduced = "dynamicFiltersProduced"; -const std::string kDynamicFiltersAccepted = "dynamicFiltersAccepted"; -const std::string kReplacedWithDynamicFilterRows = "replacedWithDynamicFilterRows"; -const std::string kDynamicFilterInputRows = "dynamicFilterInputRows"; -const std::string kFlushRowCount = "flushRowCount"; -const std::string kAbandonedPartialAggregationRows = "abandonedPartialAggregationRows"; -const std::string kLoadedToValueHook = "loadedToValueHook"; -const std::string kBloomFilterBlocksByteSize = "bloomFilterSize"; -const std::string kTotalScanTime = "totalScanTime"; -const std::string kSkippedSplits = "skippedSplits"; -const std::string kProcessedSplits = "processedSplits"; -const std::string kSkippedStrides = "skippedStrides"; -const std::string kProcessedStrides = "processedStrides"; -const std::string kRemainingFilterTime = "totalRemainingFilterWallNanos"; -const std::string kIoWaitTime = "ioWaitWallNanos"; -const std::string kStorageReadBytes = "storageReadBytes"; -const std::string kLocalReadBytes = "localReadBytes"; -const std::string kRamReadBytes = "ramReadBytes"; -const std::string kPreloadSplits = "readyPreloadedSplits"; -const std::string kPageLoadTime = "pageLoadTimeNs"; -const std::string kDataSourceAddSplitWallNanos = "dataSourceAddSplitWallNanos"; -const std::string kWaitForPreloadSplitNanos = "waitForPreloadSplitNanos"; -const std::string kDataSourceReadWallNanos = "dataSourceReadWallNanos"; -const std::string kNumWrittenFiles = "numWrittenFiles"; -const std::string kWriteIOTime = "writeIOWallNanos"; - // others const std::string kHiveDefaultPartition = "__HIVE_DEFAULT_PARTITION__"; @@ -438,116 +412,55 @@ void WholeStageResultIterator::collectMetrics() { } auto planStats = velox::exec::toPlanStats(taskStats); - // Calculate the total number of metrics. - int statsNum = 0; + folly::dynamic orderedNodeIds = folly::dynamic::array(); + folly::dynamic omittedNodeIds = folly::dynamic::array(); + folly::dynamic nodeStats = folly::dynamic::object(); + unsigned int statsNum = 0; + for (int idx = 0; idx < orderedNodeIds_.size(); idx++) { const auto& nodeId = orderedNodeIds_[idx]; + orderedNodeIds.push_back(nodeId); + if (planStats.find(nodeId) == planStats.end()) { if (omittedNodeIds_.find(nodeId) == omittedNodeIds_.end()) { LOG(WARNING) << "Not found node id: " << nodeId; LOG(WARNING) << "Plan Node: " << std::endl << veloxPlan_->toString(true, true); throw std::runtime_error("Node id cannot be found in plan status."); } - // Special handing for Filter over Project case. Filter metrics are - // omitted. + omittedNodeIds.push_back(nodeId); statsNum += 1; continue; } - statsNum += planStats.at(nodeId).operatorStats.size(); - } - - metrics_ = std::make_unique(statsNum); - - int metricIndex = 0; - for (int idx = 0; idx < orderedNodeIds_.size(); idx++) { - metrics_->get(Metrics::kLoadLazyVectorTime)[metricIndex] = 0; - - const auto& nodeId = orderedNodeIds_[idx]; - if (planStats.find(nodeId) == planStats.end()) { - // Special handing for Filter over Project case. Filter metrics are - // omitted. - metrics_->get(Metrics::kOutputRows)[metricIndex] = 0; - metrics_->get(Metrics::kOutputVectors)[metricIndex] = 0; - metrics_->get(Metrics::kOutputBytes)[metricIndex] = 0; - metrics_->get(Metrics::kCpuCount)[metricIndex] = 0; - metrics_->get(Metrics::kWallNanos)[metricIndex] = 0; - metrics_->get(Metrics::kPeakMemoryBytes)[metricIndex] = 0; - metrics_->get(Metrics::kNumMemoryAllocations)[metricIndex] = 0; - metricIndex += 1; - continue; - } const auto& stats = planStats.at(nodeId); - // Add each operator stats into metrics. + folly::dynamic operatorStats = folly::dynamic::array(); for (const auto& entry : stats.operatorStats) { - const auto& second = entry.second; - metrics_->get(Metrics::kInputRows)[metricIndex] = second->inputRows; - metrics_->get(Metrics::kInputVectors)[metricIndex] = second->inputVectors; - metrics_->get(Metrics::kInputBytes)[metricIndex] = second->inputBytes; - metrics_->get(Metrics::kRawInputRows)[metricIndex] = second->rawInputRows; - metrics_->get(Metrics::kRawInputBytes)[metricIndex] = second->rawInputBytes; - metrics_->get(Metrics::kOutputRows)[metricIndex] = second->outputRows; - metrics_->get(Metrics::kOutputVectors)[metricIndex] = second->outputVectors; - metrics_->get(Metrics::kOutputBytes)[metricIndex] = second->outputBytes; - metrics_->get(Metrics::kCpuCount)[metricIndex] = second->cpuWallTiming.count; - metrics_->get(Metrics::kWallNanos)[metricIndex] = second->cpuWallTiming.wallNanos; - metrics_->get(Metrics::kPeakMemoryBytes)[metricIndex] = second->peakMemoryBytes; - metrics_->get(Metrics::kNumMemoryAllocations)[metricIndex] = second->numMemoryAllocations; - metrics_->get(Metrics::kSpilledInputBytes)[metricIndex] = second->spilledInputBytes; - metrics_->get(Metrics::kSpilledBytes)[metricIndex] = second->spilledBytes; - metrics_->get(Metrics::kSpilledRows)[metricIndex] = second->spilledRows; - metrics_->get(Metrics::kSpilledPartitions)[metricIndex] = second->spilledPartitions; - metrics_->get(Metrics::kSpilledFiles)[metricIndex] = second->spilledFiles; - metrics_->get(Metrics::kNumDynamicFiltersProduced)[metricIndex] = - runtimeMetric("sum", second->customStats, kDynamicFiltersProduced); - metrics_->get(Metrics::kNumDynamicFiltersAccepted)[metricIndex] = - runtimeMetric("sum", second->customStats, kDynamicFiltersAccepted); - metrics_->get(Metrics::kNumReplacedWithDynamicFilterRows)[metricIndex] = - runtimeMetric("sum", second->customStats, kReplacedWithDynamicFilterRows); - metrics_->get(Metrics::kNumDynamicFilterInputRows)[metricIndex] = - runtimeMetric("sum", second->customStats, kDynamicFilterInputRows); - metrics_->get(Metrics::kFlushRowCount)[metricIndex] = runtimeMetric("sum", second->customStats, kFlushRowCount); - metrics_->get(Metrics::kAbandonedPartialAggregationRows)[metricIndex] = - runtimeMetric("sum", second->customStats, kAbandonedPartialAggregationRows); - metrics_->get(Metrics::kLoadedToValueHook)[metricIndex] = - runtimeMetric("sum", second->customStats, kLoadedToValueHook); - metrics_->get(Metrics::kBloomFilterBlocksByteSize)[metricIndex] = - runtimeMetric("sum", second->customStats, kBloomFilterBlocksByteSize); - metrics_->get(Metrics::kScanTime)[metricIndex] = runtimeMetric("sum", second->customStats, kTotalScanTime); - metrics_->get(Metrics::kSkippedSplits)[metricIndex] = runtimeMetric("sum", second->customStats, kSkippedSplits); - metrics_->get(Metrics::kProcessedSplits)[metricIndex] = - runtimeMetric("sum", second->customStats, kProcessedSplits); - metrics_->get(Metrics::kSkippedStrides)[metricIndex] = runtimeMetric("sum", second->customStats, kSkippedStrides); - metrics_->get(Metrics::kProcessedStrides)[metricIndex] = - runtimeMetric("sum", second->customStats, kProcessedStrides); - metrics_->get(Metrics::kRemainingFilterTime)[metricIndex] = - runtimeMetric("sum", second->customStats, kRemainingFilterTime); - metrics_->get(Metrics::kIoWaitTime)[metricIndex] = runtimeMetric("sum", second->customStats, kIoWaitTime); - metrics_->get(Metrics::kStorageReadBytes)[metricIndex] = - runtimeMetric("sum", second->customStats, kStorageReadBytes); - metrics_->get(Metrics::kStorageReads)[metricIndex] = - runtimeMetric("count", second->customStats, kStorageReadBytes); - metrics_->get(Metrics::kLocalReadBytes)[metricIndex] = runtimeMetric("sum", second->customStats, kLocalReadBytes); - metrics_->get(Metrics::kRamReadBytes)[metricIndex] = runtimeMetric("sum", second->customStats, kRamReadBytes); - metrics_->get(Metrics::kPreloadSplits)[metricIndex] = - runtimeMetric("sum", entry.second->customStats, kPreloadSplits); - metrics_->get(Metrics::kPageLoadTime)[metricIndex] = runtimeMetric("sum", second->customStats, kPageLoadTime); - metrics_->get(Metrics::kDataSourceAddSplitWallNanos)[metricIndex] = - runtimeMetric("sum", second->customStats, kDataSourceAddSplitWallNanos) + - runtimeMetric("sum", second->customStats, kWaitForPreloadSplitNanos); - metrics_->get(Metrics::kDataSourceReadWallNanos)[metricIndex] = - runtimeMetric("sum", second->customStats, kDataSourceReadWallNanos); - metrics_->get(Metrics::kNumWrittenFiles)[metricIndex] = - runtimeMetric("sum", entry.second->customStats, kNumWrittenFiles); - metrics_->get(Metrics::kPhysicalWrittenBytes)[metricIndex] = second->physicalWrittenBytes; - metrics_->get(Metrics::kWriteIOTime)[metricIndex] = runtimeMetric("sum", second->customStats, kWriteIOTime); - - metricIndex += 1; + const auto& opStats = entry.second; + folly::dynamic customStats = folly::dynamic::object(); + for (const auto& customMetric : opStats->customStats) { + customStats[customMetric.first] = folly::dynamic::object("sum", customMetric.second.sum)( + "count", customMetric.second.count)("min", customMetric.second.min)("max", customMetric.second.max); + } + + operatorStats.push_back(folly::dynamic::object("inputRows", opStats->inputRows)( + "inputVectors", opStats->inputVectors)("inputBytes", opStats->inputBytes)( + "rawInputRows", opStats->rawInputRows)("rawInputBytes", opStats->rawInputBytes)( + "outputRows", opStats->outputRows)("outputVectors", opStats->outputVectors)( + "outputBytes", opStats->outputBytes)("cpuCount", opStats->cpuWallTiming.count)( + "wallNanos", opStats->cpuWallTiming.wallNanos)("peakMemoryBytes", opStats->peakMemoryBytes)( + "numMemoryAllocations", opStats->numMemoryAllocations)("spilledInputBytes", opStats->spilledInputBytes)( + "spilledBytes", opStats->spilledBytes)("spilledRows", opStats->spilledRows)( + "spilledPartitions", opStats->spilledPartitions)("spilledFiles", opStats->spilledFiles)( + "physicalWrittenBytes", opStats->physicalWrittenBytes)("customStats", customStats)); } + + statsNum += static_cast(operatorStats.size()); + nodeStats[nodeId] = folly::dynamic::object("operatorStats", operatorStats); } - // Put the loadLazyVector time into the metrics of the last operator. - metrics_->get(Metrics::kLoadLazyVectorTime)[orderedNodeIds_.size() - 1] = loadLazyVectorTime_; + folly::dynamic payload = folly::dynamic::object("orderedNodeIds", orderedNodeIds)("omittedNodeIds", omittedNodeIds)( + "loadLazyVectorTime", loadLazyVectorTime_)("nodeStats", nodeStats); + metrics_ = std::make_unique(statsNum, folly::toJson(payload)); // Populate the metrics with task stats for long running tasks. if (const int64_t collectTaskStatsThreshold = @@ -560,27 +473,6 @@ void WholeStageResultIterator::collectMetrics() { } } -int64_t WholeStageResultIterator::runtimeMetric( - const std::string& type, - const std::unordered_map& runtimeStats, - const std::string& metricId) { - if (runtimeStats.find(metricId) == runtimeStats.end()) { - return 0; - } - - if (type == "sum") { - return runtimeStats.at(metricId).sum; - } else if (type == "count") { - return runtimeStats.at(metricId).count; - } else if (type == "min") { - return runtimeStats.at(metricId).min; - } else if (type == "max") { - return runtimeStats.at(metricId).max; - } else { - return 0; - } -} - std::unordered_map WholeStageResultIterator::getQueryContextConf() { std::unordered_map configs = {}; // Find batch size from Spark confs. If found, set the preferred and max batch size. diff --git a/cpp/velox/compute/WholeStageResultIterator.h b/cpp/velox/compute/WholeStageResultIterator.h index 4fcc002ffd2..601ac2c594e 100644 --- a/cpp/velox/compute/WholeStageResultIterator.h +++ b/cpp/velox/compute/WholeStageResultIterator.h @@ -121,12 +121,6 @@ class WholeStageResultIterator : public SplitAwareColumnarBatchIterator { /// Collect Velox metrics. void collectMetrics(); - /// Return a certain type of runtime metric. Supported metric types are: sum, count, min, max. - static int64_t runtimeMetric( - const std::string& type, - const std::unordered_map& runtimeStats, - const std::string& metricId); - /// Memory. VeloxMemoryManager* memoryManager_; diff --git a/cpp/velox/tests/RuntimeTest.cc b/cpp/velox/tests/RuntimeTest.cc index 9772d752b8f..c34d3f8170b 100644 --- a/cpp/velox/tests/RuntimeTest.cc +++ b/cpp/velox/tests/RuntimeTest.cc @@ -89,7 +89,7 @@ class DummyRuntime final : public Runtime { throw GlutenException("Not yet implemented"); } Metrics* getMetrics(ColumnarBatchIterator* rawIter, int64_t exportNanos) override { - static Metrics m(1); + static Metrics m(0, R"({"orderedNodeIds":[],"omittedNodeIds":[],"loadLazyVectorTime":0,"nodeStats":{}})"); return &m; } std::shared_ptr createShuffleReader( diff --git a/docs/developers/MetricsFramework.md b/docs/developers/MetricsFramework.md new file mode 100644 index 00000000000..5cd31946e9a --- /dev/null +++ b/docs/developers/MetricsFramework.md @@ -0,0 +1,336 @@ +--- +layout: page +title: Metrics Framework +nav_order: 15 +parent: Developer Overview +--- + +# Metrics Framework + +This document explains how Velox operator metrics are mapped back to Gluten +Spark SQL metrics. The mapping has three ordered steps: + +1. Native code treefies the Velox plan into `orderedNodeIds`. +2. Scala parses the JSON payload into a flat `JList[OperatorMetrics]`. +3. Scala treefies the Spark plan into `MetricsUpdaterTree` and consumes the + flattened native metrics in that order. + +The JSON transport keeps the JNI boundary small and stable. C++ reports named +Velox stats in a deterministic order; Scala owns the mapping from those stats to +Gluten operator metrics. + +## Mapping Overview + +The metrics mapping joins three views of the same execution: + +- Velox plan node ids and task stats from native execution. +- Substrait rel ids recorded in `operatorToRelsMap`. +- Spark physical operators, each with a `MetricsUpdater`. + +During planning, Gluten assigns an operator id to each transform operator and +records the Substrait rel ids generated for that operator: + +```text +operatorToRelsMap: Spark operator id -> Substrait rel ids +``` + +After execution, native code serializes Velox stats in `orderedNodeIds` order. +Scala parses that JSON into: + +```text +Velox JSON node stats -> JList[OperatorMetrics] +``` + +Finally, `MetricsUtil.updateTransformerMetricsInternal` walks the +`MetricsUpdaterTree`, `operatorToRelsMap`, and native metric list together. The +current implementation consumes both Spark operator ids and native metric indexes +from the end: + +```text +operatorIdx = relMap.size() - 1 +metricsIdx = nativeMetrics.size() - 1 +``` + +Each Spark operator consumes the native metric suites that correspond to its +Substrait rel ids, merges or interprets them, and writes the final values into +Spark `SQLMetric`s. + +## Step 1: Native Treefy to orderedNodeIds + +`orderedNodeIds` is produced by `WholeStageResultIterator::getOrderedNodeIds`. +This is a native treefy process over the Velox plan. It converts the Velox plan +tree into a deterministic list of node ids that Scala can later flatten into +`OperatorMetrics`. + +This step is required because Velox task stats are keyed by plan node id. A map +of stats does not preserve the traversal order needed by Gluten's metrics +updater tree. Native code must provide that order explicitly. + +For ordinary Velox nodes, `getOrderedNodeIds` performs post-order traversal: + +```text +visit all sources first +then append current node id +``` + +This gives Scala a list that matches Substrait rel order and can be consumed in +reverse by decrementing `metricsIdx`. + +`getOrderedNodeIds` also encodes Velox-specific plan-shape adjustments that +Scala cannot reliably infer from task stats alone: + +- For Project nodes, it visits the source first and then the Project node. +- If the Project source is a Filter node, Velox has mapped Filter over Project + into a FilterProject operator. The Filter node has no independent stats, so + native code records the Filter id in `omittedNodeIds`. +- For Union, Velox represents the Spark union through a gather + `LocalPartitionNode` plus projected children. Native code walks through the + projected children and then records the gather node for the union root. + +The native treefy result becomes the ordering contract for the rest of the +framework: + +```text +Velox plan tree + -> getOrderedNodeIds + -> orderedNodeIds + omittedNodeIds +``` + +Without `orderedNodeIds`, Scala would have to depend on the iteration order of +Velox's `planStats` map or reconstruct Velox-specific plan rewrites after the +fact. Either option would make metrics assignment fragile, especially for +operators that are fused, omitted, or expanded into multiple Velox nodes. + +## Step 2: JSON to OperatorMetrics + +After the Velox iterator finishes, C++ reads Velox task stats and serializes a +JSON payload with these fields: + +- `orderedNodeIds`: Velox plan node ids in the native treefy order. +- `omittedNodeIds`: expected nodes that do not have Velox stats. +- `nodeStats`: per-node Velox operator stats. +- `loadLazyVectorTime`: Gluten lazy vector loading time. + +Scala parses the payload in `MetricsUtil.parseNativeOperatorMetrics`: + +1. Iterate through `orderedNodeIds`. +2. Look up each node id in `nodeStats`. +3. Convert every Velox operator stat for the node into `OperatorMetrics`. +4. If the node id is in `omittedNodeIds` and has no stat, insert an empty + `OperatorMetrics` placeholder. +5. Attach `loadLazyVectorTime` to the last ordered node. +6. Validate that the parsed count matches `Metrics.numMetrics`. + +This produces the flat `JList[OperatorMetrics]` that the Spark updater tree +will consume. + +```text +orderedNodeIds: [n0, n1, n2] +nodeStats: + n0 -> [stat0] + n1 -> [stat1, stat2] + n2 -> [stat3] + +flattened nativeMetrics: + [OperatorMetrics(stat0), + OperatorMetrics(stat1), + OperatorMetrics(stat2), + OperatorMetrics(stat3)] +``` + +If a node is omitted, Scala still inserts a zero-value placeholder so native +metric indexes continue to line up with the updater traversal. + +## Step 3: Spark Plan to MetricsUpdaterTree + +`MetricsUtil.treeifyMetricsUpdaters(plan)` converts the Spark physical plan into +a tree of metrics updaters. This is the Spark-side treefy process. The resulting +tree describes which Spark operators should receive native metrics and in what +child order they should be traversed. + +The important cases are: + +- `HashJoinLikeExecTransformer`: creates a join updater node with children in + `(buildPlan, streamedPlan)` order. +- `SortMergeJoinExecTransformer`: creates a join updater node with children in + `(bufferedPlan, streamedPlan)` order. +- `TransformSupport` with `MetricsUpdater.None`: skips the current node and + treeifies its child. This is used when a Spark node exists for planning shape + but should not receive native metrics itself. +- Other `TransformSupport`: creates an updater node and treeifies + `children.reverse`. +- Non-transform Spark nodes: become `MetricsUpdater.Terminate`, which stops + native metric propagation for that branch. + +The child reversal is intentional. Native metrics are later consumed from the +end of the flattened list, so the updater tree must mirror the order produced by +Substrait planning and native `orderedNodeIds`. + +Conceptually: + +```text +SparkPlan + -> treeifyMetricsUpdaters + -> MetricsUpdaterTree(updater, children) +``` + +The tree does not contain metric values. It only contains the updater topology +needed to replay native metrics onto Spark operators. + +## Step 4: Consume and Map Suites + +In the Scala mapping code, one `OperatorMetrics` object represents one native +metric suite. A suite usually corresponds to one Velox operator stat. Some Spark +operators consume one suite; others consume multiple suites because the Spark +operator expands to multiple Substrait/Velox operators. + +The number of suites initially assigned to a Spark operator is: + +```text +relMap(operatorIdx).size() +``` + +`updateTransformerMetricsInternal` performs the operator-level mapping. For each +updater node, it: + +1. Reads the Substrait rel ids for the current `operatorIdx`. +2. Consumes one native `OperatorMetrics` suite per rel id. +3. Applies operator-specific handling. +4. Recurses into child updater nodes with decremented indexes. + +For a normal unary operator, the consumed suites are merged and passed to: + +```text +u.updateNativeMetrics(mergedOperatorMetrics) +``` + +The merge behavior is designed around Velox pipeline shape: + +- Input-side counters are taken from the last consumed suite. +- Output-side and write counters are taken from the first consumed suite. +- CPU, wall time, spill, allocation, and most custom counters are accumulated. +- Peak memory uses the maximum value. + +This gives the Spark operator one coherent metric row even when it was +implemented by multiple native rels. + +The alignment can be summarized as: + +```text +native orderedNodeIds treefy + -> flattened OperatorMetrics + -> Spark MetricsUpdaterTree traversal + -> Spark SQLMetric updates +``` + +## Operator-Specific Mapping + +Some operators do not follow the simple "consume rel count, merge, update" rule. + +### Joins + +Join updaters consume the suites assigned by `relMap`, then consume one +additional suite for the build/probe side metrics. The updater also receives join +parameters from planning, so it can map Velox join-side values to the correct +Spark SQL metrics. + +`HashJoinLikeExecTransformer` and `SortMergeJoinExecTransformer` also use custom +tree child ordering during Spark-side treefy, because build/buffered and +streamed sides must line up with the native traversal. + +### Union + +`UnionMetricsUpdater` consumes one extra suite and updates union-specific metrics +from the combined native values. + +### Hash Aggregate + +`HashAggregateMetricsUpdater` uses aggregation parameters recorded during +planning. The native suites still come from `relMap`, but the updater needs +those parameters to decide how aggregation metrics map to Spark metrics. + +### Limit Over Sort + +Velox may implement `Limit` over `Sort` as a TopN-style native operator. In that +case, the native metric suite belongs to the sort updater. The limit updater does +not update metrics and does not consume a suite, so the downstream indexes +remain aligned. + +## End-to-End Example + +For a simple transformed plan: + +```text +Project + Filter + Scan +``` + +native treefy produces: + +```text +orderedNodeIds: + [scan node, filter node, project node] +``` + +Scala parses the JSON into: + +```text +nativeMetrics: + 0 -> scan suite + 1 -> filter suite + 2 -> project suite +``` + +Spark-side treefy builds: + +```text +ProjectUpdater + FilterUpdater + ScanUpdater +``` + +Suppose planning recorded: + +```text +operatorToRelsMap: + 0 -> [scan rel] + 1 -> [filter rel] + 2 -> [project rel] +``` + +The updater starts from the end: + +```text +operatorIdx = 2, metricsIdx = 2 +``` + +It updates project first, then filter, then scan. Each step consumes the suite +for the current operator and decrements both indexes. More complex operators use +the same traversal but may consume more than one suite. + +## Adding or Debugging a Mapping + +When adding a metric or debugging a wrong value, follow the same path as runtime: + +1. Check native `orderedNodeIds` and `omittedNodeIds` if the wrong Velox node is + being flattened or a fused node is missing. +2. Check that `parseNativeOperatorMetrics` produces the expected number and + order of `OperatorMetrics` suites. +3. Check the Spark metric key in `VeloxMetricsApi`. +4. Check the target `MetricsUpdater` to see how the `OperatorMetrics` suite is + written to Spark SQL metrics. +5. Check whether the operator consumes a normal number of suites or has special + handling in `updateTransformerMetricsInternal`. +6. Check Spark-side treefy child ordering if the wrong side of a join or + multi-child operator is being updated. + +The most useful invariant is: + +```text +parsed native metric count == Metrics.numMetrics +``` + +If that holds but values are assigned to the wrong Spark operator, inspect the +native `orderedNodeIds`, the `MetricsUpdaterTree` shape, `operatorToRelsMap`, +and any operator-specific extra suite consumption. diff --git a/mkdocs.yml b/mkdocs.yml index 69aa0efcd38..b5f6fbd9631 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -62,6 +62,7 @@ nav: - Velox Backend CI: developers/velox-backend-CI.md - Build in Docker: developers/velox-backend-build-in-docker.md - Query Trace: developers/QueryTrace.md + - Metrics Framework: developers/MetricsFramework.md - Memory Profiling: developers/ProfileMemoryOfGlutenWithVelox.md - Dynamic Off-Heap Sizing: developers/VeloxDynamicSizingOffheap.md - Partial Project: developers/PartialProject.md