From 0270b734b8a41a4dcf590345d776ffca3d405d95 Mon Sep 17 00:00:00 2001 From: Luis Penaranda <11061462+luis4a0@users.noreply.github.com> Date: Tue, 19 May 2026 09:40:49 +0000 Subject: [PATCH 1/2] [CORE] Add customMetrics extension point to ShuffleWriterMetrics MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ShuffleWriterMetrics currently has a hand-rolled list of 9 scalar fields, two of which (`avgDictionaryFields`, `dictionarySize`) are Velox-specific. Adding more backend-specific scalars every time a backend needs another counter doesn't scale — other backends (ClickHouse, GPU, RSS) have the same need and the cross-backend coordination cost grows linearly per metric. This change adds a generic `std::unordered_map customMetrics` to ShuffleWriterMetrics that any shuffle writer can populate with backend-specific stats. It is plumbed through the existing JNI `stop()` serialization as two parallel arrays (keys + values) into `GlutenSplitResult`, where the JVM side reassembles them lazily into an unmodifiable `Map` on first access. Convention for keys: `..`. Spark-side surfacing (registration as SQLMetrics) happens per-key in the backend's MetricsApi; unknown keys are silently dropped on the Scala side so the producer can ship without coordinating with every downstream surface. Includes `GlutenSplitResultSuite` covering the JVM-side reassembly (empty / null / populated arrays, caching, immutability) so the JNI boundary is fenced by a unit test that doesn't need a full Spark / native round-trip. Generated-by: GitHub Copilot CLI (Claude Opus 4.7 1M context) Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- cpp/core/jni/JniWrapper.cc | 29 +++++- cpp/core/shuffle/Options.h | 18 ++++ cpp/core/shuffle/ShuffleWriter.cc | 4 + cpp/core/shuffle/ShuffleWriter.h | 4 + .../gluten/vectorized/GlutenSplitResult.java | 47 ++++++++- .../vectorized/GlutenSplitResultSuite.scala | 97 +++++++++++++++++++ 6 files changed, 196 insertions(+), 3 deletions(-) create mode 100644 gluten-arrow/src/test/scala/org/apache/gluten/vectorized/GlutenSplitResultSuite.scala diff --git a/cpp/core/jni/JniWrapper.cc b/cpp/core/jni/JniWrapper.cc index 46b9d7603ce..0842d40f146 100644 --- a/cpp/core/jni/JniWrapper.cc +++ b/cpp/core/jni/JniWrapper.cc @@ -269,7 +269,8 @@ jint JNI_OnLoad(JavaVM* vm, void* reserved) { jniByteInputStreamClose = getMethodIdOrError(env, jniByteInputStreamClass, "close", "()V"); splitResultClass = createGlobalClassReferenceOrError(env, "Lorg/apache/gluten/vectorized/GlutenSplitResult;"); - splitResultConstructor = getMethodIdOrError(env, splitResultClass, "", "(JJJJJJJJJJDJ[J[J)V"); + splitResultConstructor = + getMethodIdOrError(env, splitResultClass, "", "(JJJJJJJJJJDJ[J[J[Ljava/lang/String;[J)V"); metricsBuilderClass = createGlobalClassReferenceOrError(env, "Lorg/apache/gluten/metrics/Metrics;"); @@ -1161,6 +1162,28 @@ JNIEXPORT jobject JNICALL Java_org_apache_gluten_vectorized_ShuffleWriterJniWrap auto rawSrc = reinterpret_cast(rawPartitionLengths.data()); env->SetLongArrayRegion(rawPartitionLengthArr, 0, rawPartitionLengths.size(), rawSrc); + // Marshal backend-specific custom metrics as two parallel arrays. Keeping + // them parallel (rather than constructing a Java HashMap from JNI) keeps + // the JNI call cheap; the Java POJO reconstructs the map lazily on the + // first `getCustomMetrics()` call. + const auto& customMetrics = shuffleWriter->customMetrics(); + auto customMetricsKeyArr = env->NewObjectArray(customMetrics.size(), env->FindClass("java/lang/String"), nullptr); + auto customMetricsValueArr = env->NewLongArray(customMetrics.size()); + { + std::vector customMetricsValues; + customMetricsValues.reserve(customMetrics.size()); + jsize idx = 0; + for (const auto& kv : customMetrics) { + jstring keyStr = env->NewStringUTF(kv.first.c_str()); + env->SetObjectArrayElement(customMetricsKeyArr, idx++, keyStr); + env->DeleteLocalRef(keyStr); + customMetricsValues.push_back(static_cast(kv.second)); + } + if (!customMetricsValues.empty()) { + env->SetLongArrayRegion(customMetricsValueArr, 0, customMetricsValues.size(), customMetricsValues.data()); + } + } + jobject splitResult = env->NewObject( splitResultClass, splitResultConstructor, @@ -1177,7 +1200,9 @@ JNIEXPORT jobject JNICALL Java_org_apache_gluten_vectorized_ShuffleWriterJniWrap shuffleWriter->avgDictionaryFields(), shuffleWriter->dictionarySize(), partitionLengthArr, - rawPartitionLengthArr); + rawPartitionLengthArr, + customMetricsKeyArr, + customMetricsValueArr); return splitResult; JNI_METHOD_END(nullptr) diff --git a/cpp/core/shuffle/Options.h b/cpp/core/shuffle/Options.h index 649a1647749..9d21adae92e 100644 --- a/cpp/core/shuffle/Options.h +++ b/cpp/core/shuffle/Options.h @@ -24,6 +24,9 @@ #include #include +#include +#include + namespace gluten { static constexpr int16_t kDefaultBatchSize = 4096; @@ -234,5 +237,20 @@ struct ShuffleWriterMetrics { int64_t dictionarySize{0}; std::vector partitionLengths{}; std::vector rawPartitionLengths{}; // Uncompressed size. + + // Backend-specific shuffle writer metrics that don't justify a dedicated + // scalar field on this struct (see CONTRIBUTING note in #12107 / #12108 / + // #12109 — Velox backend uses this for per-stage timer breakdowns + input + // encoding mix; ClickHouse / GPU / RSS backends are free to populate + // whatever scalar counters they want under their own namespaced keys). + // + // Convention for keys: ".." — e.g. + // "Velox.InputEncoding.Flat" / "Velox.SplitRV.FixedWidthWallNanos". + // Spark-side surfacing (registration as SQLMetrics, naming, accumulation + // across tasks) happens per-key in `VeloxMetricsApi` / `CHMetricsApi`; + // unknown keys are silently dropped so a backend can ship new metrics + // ahead of the Spark-side registration without breaking older Spark + // versions of Gluten. + std::unordered_map customMetrics{}; }; } // namespace gluten diff --git a/cpp/core/shuffle/ShuffleWriter.cc b/cpp/core/shuffle/ShuffleWriter.cc index 3f0feadfb0f..a00aa439c60 100644 --- a/cpp/core/shuffle/ShuffleWriter.cc +++ b/cpp/core/shuffle/ShuffleWriter.cc @@ -109,6 +109,10 @@ const std::vector& ShuffleWriter::rawPartitionLengths() const { return metrics_.rawPartitionLengths; } +const std::unordered_map& ShuffleWriter::customMetrics() const { + return metrics_.customMetrics; +} + ShuffleWriter::ShuffleWriter(int32_t numPartitions, Partitioning partitioning) : numPartitions_(numPartitions), partitioning_(partitioning) {} } // namespace gluten diff --git a/cpp/core/shuffle/ShuffleWriter.h b/cpp/core/shuffle/ShuffleWriter.h index 934ad090763..a6ec39c4511 100644 --- a/cpp/core/shuffle/ShuffleWriter.h +++ b/cpp/core/shuffle/ShuffleWriter.h @@ -67,6 +67,10 @@ class ShuffleWriter : public Reclaimable { const std::vector& rawPartitionLengths() const; + // Backend-specific shuffle writer metrics. See `ShuffleWriterMetrics` + // declaration in `Options.h` for the key-naming convention. + const std::unordered_map& customMetrics() const; + protected: ShuffleWriter(int32_t numPartitions, Partitioning partitioning); diff --git a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/GlutenSplitResult.java b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/GlutenSplitResult.java index 96b2a3fc541..ec089ad2ec8 100644 --- a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/GlutenSplitResult.java +++ b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/GlutenSplitResult.java @@ -16,6 +16,10 @@ */ package org.apache.gluten.vectorized; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.Map; + public class GlutenSplitResult { private final long totalComputePidTime; private final long totalWriteTime; @@ -32,6 +36,15 @@ public class GlutenSplitResult { private final double avgDictionaryFields; private final long dictionarySize; + // Backend-specific shuffle writer metrics. Marshalled across JNI as two + // parallel arrays (keys + values) to keep the JNI call cheap; reassembled + // here into a Map on first access. See `ShuffleWriterMetrics::customMetrics` + // in `cpp/core/shuffle/Options.h` for the key-naming convention + // (`..`). + private final String[] customMetricsKeys; + private final long[] customMetricsValues; + private volatile Map customMetricsCache; + public GlutenSplitResult( long totalComputePidTime, long totalWriteTime, @@ -46,7 +59,9 @@ public GlutenSplitResult( double avgDictionaryFields, long dictionarySize, long[] partitionLengths, - long[] rawPartitionLengths) { + long[] rawPartitionLengths, + String[] customMetricsKeys, + long[] customMetricsValues) { this.totalComputePidTime = totalComputePidTime; this.totalWriteTime = totalWriteTime; this.totalEvictTime = totalEvictTime; @@ -61,6 +76,8 @@ public GlutenSplitResult( this.c2rTime = totalC2RTime; this.avgDictionaryFields = avgDictionaryFields; this.dictionarySize = dictionarySize; + this.customMetricsKeys = customMetricsKeys; + this.customMetricsValues = customMetricsValues; } public long getTotalComputePidTime() { @@ -122,4 +139,32 @@ public double getAvgDictionaryFields() { public long getDictionarySize() { return dictionarySize; } + + /** + * Backend-specific shuffle writer metrics, keyed by `..`. The map + * preserves the iteration order JNI marshalled, but callers should treat the map as unordered. + * Returns an empty map if the native side did not populate any custom metrics (e.g. older Gluten + * libs, or backends that don't yet emit any). The returned map is unmodifiable. + */ + public Map getCustomMetrics() { + Map cached = customMetricsCache; + if (cached != null) { + return cached; + } + synchronized (this) { + if (customMetricsCache != null) { + return customMetricsCache; + } + if (customMetricsKeys == null || customMetricsKeys.length == 0) { + customMetricsCache = Collections.emptyMap(); + } else { + LinkedHashMap map = new LinkedHashMap<>(customMetricsKeys.length); + for (int i = 0; i < customMetricsKeys.length; i++) { + map.put(customMetricsKeys[i], customMetricsValues[i]); + } + customMetricsCache = Collections.unmodifiableMap(map); + } + return customMetricsCache; + } + } } diff --git a/gluten-arrow/src/test/scala/org/apache/gluten/vectorized/GlutenSplitResultSuite.scala b/gluten-arrow/src/test/scala/org/apache/gluten/vectorized/GlutenSplitResultSuite.scala new file mode 100644 index 00000000000..be048c8bab8 --- /dev/null +++ b/gluten-arrow/src/test/scala/org/apache/gluten/vectorized/GlutenSplitResultSuite.scala @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.vectorized + +import org.apache.spark.SparkFunSuite + +/** + * Unit tests for [[GlutenSplitResult]]'s `customMetrics` reassembly logic. The marshalled form + * across JNI is two parallel arrays (keys + values); the POJO reassembles them lazily on first + * `getCustomMetrics()` access and caches an unmodifiable map. These tests exercise the JVM-side + * boundary without needing a Spark / native-library round-trip. + */ +class GlutenSplitResultSuite extends SparkFunSuite { + + private def newResult(keys: Array[String], values: Array[Long]): GlutenSplitResult = { + new GlutenSplitResult( + 0L, + 0L, + 0L, + 0L, + 0L, + 0L, + 0L, + 0L, + 0L, + 0L, + 0.0d, + 0L, + Array.empty[Long], + Array.empty[Long], + keys, + values) + } + + test("getCustomMetrics returns empty map when native side passed no entries") { + val r = newResult(Array.empty[String], Array.empty[Long]) + val m = r.getCustomMetrics + assert(m.isEmpty) + } + + test("getCustomMetrics returns empty map when native side passed null arrays") { + val r = newResult(null, null) + val m = r.getCustomMetrics + assert(m.isEmpty) + } + + test("getCustomMetrics reassembles the parallel-array form into a Map") { + val keys = Array("Velox.InputEncoding.Flat", "Velox.InputEncoding.Dictionary") + val values = Array(123L, 7L) + val r = newResult(keys, values) + val m = r.getCustomMetrics + assert(m.size() == 2) + assert(m.get("Velox.InputEncoding.Flat") == 123L) + assert(m.get("Velox.InputEncoding.Dictionary") == 7L) + } + + test("getCustomMetrics caches the reassembled map across calls") { + val r = newResult(Array("k"), Array(1L)) + val first = r.getCustomMetrics + val second = r.getCustomMetrics + // Same identity: cached result is returned on subsequent calls. + assert(first eq second) + } + + test("getCustomMetrics returns an unmodifiable map") { + val r = newResult(Array("k"), Array(1L)) + val m = r.getCustomMetrics + intercept[UnsupportedOperationException] { + m.put("x", 2L) + } + } + + test("getCustomMetrics is null-safe for code reading from older Gluten libs") { + // Older Gluten native builds may construct GlutenSplitResult without + // the customMetrics arrays at all — simulate by passing null/null. We + // already covered that, but assert here that the *empty* map is + // distinguishable from a populated one. + val empty = newResult(null, null).getCustomMetrics + val populated = newResult(Array("a"), Array(1L)).getCustomMetrics + assert(empty.isEmpty) + assert(!populated.isEmpty) + } +} From 722fa454131befa062b499293e7226ab909c3046 Mon Sep 17 00:00:00 2001 From: Luis Penaranda <11061462+luis4a0@users.noreply.github.com> Date: Tue, 19 May 2026 09:13:58 +0000 Subject: [PATCH 2/2] [CORE] Defensive length-match check on GlutenSplitResult.customMetrics MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit If a producer ever ships mismatched key/value arrays through the JNI boundary, the AIOOBE inside the synchronized block in `getCustomMetrics()` would leave the volatile cache field unassigned — every subsequent call would re-enter the synchronized block and re-throw, an unhelpful failure mode. Adds an explicit length-match check (before the cache field is assigned to a partial map) that throws `IllegalStateException` mentioning both array lengths so the producer-side bug is unambiguous. Two new `GlutenSplitResultSuite` cases: - mismatched lengths - null values array when keys is non-empty Both assert the second call still throws (cache stays null on failure paths). Full suite is now 8/8 locally. Generated-by: GitHub Copilot CLI (Claude Opus 4.7 1M context) Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../gluten/vectorized/GlutenSplitResult.java | 17 +++++++++++++++-- .../vectorized/GlutenSplitResultSuite.scala | 19 +++++++++++++++++++ 2 files changed, 34 insertions(+), 2 deletions(-) diff --git a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/GlutenSplitResult.java b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/GlutenSplitResult.java index ec089ad2ec8..2de52573592 100644 --- a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/GlutenSplitResult.java +++ b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/GlutenSplitResult.java @@ -143,8 +143,8 @@ public long getDictionarySize() { /** * Backend-specific shuffle writer metrics, keyed by `..`. The map * preserves the iteration order JNI marshalled, but callers should treat the map as unordered. - * Returns an empty map if the native side did not populate any custom metrics (e.g. older Gluten - * libs, or backends that don't yet emit any). The returned map is unmodifiable. + * Returns an empty map if the native side did not populate any custom metrics. The returned map + * is unmodifiable. */ public Map getCustomMetrics() { Map cached = customMetricsCache; @@ -158,6 +158,19 @@ public Map getCustomMetrics() { if (customMetricsKeys == null || customMetricsKeys.length == 0) { customMetricsCache = Collections.emptyMap(); } else { + // Defensive check: if a future native-side producer ever ships + // mismatched arrays, fail loudly here (before the cache field is + // assigned to a partial result). Without this, the AIOOBE inside the + // loop would leave `customMetricsCache` null and every subsequent + // `getCustomMetrics()` call would re-enter the synchronized block + // and re-throw — a confusing failure mode. + if (customMetricsValues == null || customMetricsValues.length != customMetricsKeys.length) { + throw new IllegalStateException( + "customMetricsKeys / customMetricsValues length mismatch: " + + customMetricsKeys.length + + " vs " + + (customMetricsValues == null ? "null" : customMetricsValues.length)); + } LinkedHashMap map = new LinkedHashMap<>(customMetricsKeys.length); for (int i = 0; i < customMetricsKeys.length; i++) { map.put(customMetricsKeys[i], customMetricsValues[i]); diff --git a/gluten-arrow/src/test/scala/org/apache/gluten/vectorized/GlutenSplitResultSuite.scala b/gluten-arrow/src/test/scala/org/apache/gluten/vectorized/GlutenSplitResultSuite.scala index be048c8bab8..565f40ffbe0 100644 --- a/gluten-arrow/src/test/scala/org/apache/gluten/vectorized/GlutenSplitResultSuite.scala +++ b/gluten-arrow/src/test/scala/org/apache/gluten/vectorized/GlutenSplitResultSuite.scala @@ -94,4 +94,23 @@ class GlutenSplitResultSuite extends SparkFunSuite { assert(empty.isEmpty) assert(!populated.isEmpty) } + + test("getCustomMetrics fails loudly on mismatched key/value array lengths") { + // A future native-side producer that ships mismatched arrays must not + // silently corrupt the metrics map (and must not leave the lazy cache + // field unassigned so subsequent calls re-enter and re-throw). Assert + // that the first call throws IllegalStateException with both lengths + // mentioned, and that a second call still throws (the cache field is + // never assigned to a partial map). + val r = newResult(Array("a", "b"), Array(1L)) + val ex = intercept[IllegalStateException](r.getCustomMetrics) + assert(ex.getMessage.contains("2") && ex.getMessage.contains("1")) + intercept[IllegalStateException](r.getCustomMetrics) + } + + test("getCustomMetrics fails loudly when values array is null but keys is non-empty") { + val r = newResult(Array("a"), null) + val ex = intercept[IllegalStateException](r.getCustomMetrics) + assert(ex.getMessage.contains("null")) + } }