Skip to content

Commit 827f84b

Browse files
luis4a0Copilot
andcommitted
[CORE] Add customMetrics extension point to ShuffleWriterMetrics
ShuffleWriterMetrics currently has a hand-rolled list of 9 scalar fields, two of which (`avgDictionaryFields`, `dictionarySize`) are Velox-specific. Adding more backend-specific scalars every time someone needs another counter doesn't scale — other backends (ClickHouse, GPU, RSS) have the same need and the cross-backend coordination cost grows linearly per metric. This PR adds a generic `std::unordered_map<std::string, int64_t> customMetrics` to ShuffleWriterMetrics that any shuffle writer can populate with backend-specific stats. It is plumbed through the existing JNI `stop()` serialization as two parallel arrays (keys + values) into `GlutenSplitResult`, where the JVM side reassembles them lazily into an unmodifiable `Map<String, Long>` on first access. Convention for keys: `<Backend>.<Family>.<Stat>` — e.g. `Velox.InputEncoding.Flat` or `Velox.SplitRV.FixedWidthWallNanos`. Spark-side registration as SQLMetrics happens per-key in the backend's MetricsApi (`VeloxMetricsApi` / `CHMetricsApi`); unknown keys are silently dropped on the Scala side so a backend can ship new metrics ahead of the Spark-side registration without breaking older Spark wrappers. This commit only introduces the plumbing — no backend populates the map yet. The follow-up commit on this PR wires up the Velox hash shuffle writer as the first consumer. Includes `GlutenSplitResultSuite` covering the JVM-side reassembly (empty / null / populated arrays, caching, immutability) so the JNI boundary is fenced by a unit test that doesn't need a full Spark / native round-trip. Generated-by: GitHub Copilot CLI (Claude Opus 4.7 1M context) Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent 0b1e511 commit 827f84b

6 files changed

Lines changed: 196 additions & 3 deletions

File tree

cpp/core/jni/JniWrapper.cc

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -269,7 +269,8 @@ jint JNI_OnLoad(JavaVM* vm, void* reserved) {
269269
jniByteInputStreamClose = getMethodIdOrError(env, jniByteInputStreamClass, "close", "()V");
270270

271271
splitResultClass = createGlobalClassReferenceOrError(env, "Lorg/apache/gluten/vectorized/GlutenSplitResult;");
272-
splitResultConstructor = getMethodIdOrError(env, splitResultClass, "<init>", "(JJJJJJJJJJDJ[J[J)V");
272+
splitResultConstructor =
273+
getMethodIdOrError(env, splitResultClass, "<init>", "(JJJJJJJJJJDJ[J[J[Ljava/lang/String;[J)V");
273274

274275
metricsBuilderClass = createGlobalClassReferenceOrError(env, "Lorg/apache/gluten/metrics/Metrics;");
275276

@@ -1161,6 +1162,28 @@ JNIEXPORT jobject JNICALL Java_org_apache_gluten_vectorized_ShuffleWriterJniWrap
11611162
auto rawSrc = reinterpret_cast<const jlong*>(rawPartitionLengths.data());
11621163
env->SetLongArrayRegion(rawPartitionLengthArr, 0, rawPartitionLengths.size(), rawSrc);
11631164

1165+
// Marshal backend-specific custom metrics as two parallel arrays. Keeping
1166+
// them parallel (rather than constructing a Java HashMap from JNI) keeps
1167+
// the JNI call cheap; the Java POJO reconstructs the map lazily on the
1168+
// first `getCustomMetrics()` call.
1169+
const auto& customMetrics = shuffleWriter->customMetrics();
1170+
auto customMetricsKeyArr = env->NewObjectArray(customMetrics.size(), env->FindClass("java/lang/String"), nullptr);
1171+
auto customMetricsValueArr = env->NewLongArray(customMetrics.size());
1172+
{
1173+
std::vector<jlong> customMetricsValues;
1174+
customMetricsValues.reserve(customMetrics.size());
1175+
jsize idx = 0;
1176+
for (const auto& kv : customMetrics) {
1177+
jstring keyStr = env->NewStringUTF(kv.first.c_str());
1178+
env->SetObjectArrayElement(customMetricsKeyArr, idx++, keyStr);
1179+
env->DeleteLocalRef(keyStr);
1180+
customMetricsValues.push_back(static_cast<jlong>(kv.second));
1181+
}
1182+
if (!customMetricsValues.empty()) {
1183+
env->SetLongArrayRegion(customMetricsValueArr, 0, customMetricsValues.size(), customMetricsValues.data());
1184+
}
1185+
}
1186+
11641187
jobject splitResult = env->NewObject(
11651188
splitResultClass,
11661189
splitResultConstructor,
@@ -1177,7 +1200,9 @@ JNIEXPORT jobject JNICALL Java_org_apache_gluten_vectorized_ShuffleWriterJniWrap
11771200
shuffleWriter->avgDictionaryFields(),
11781201
shuffleWriter->dictionarySize(),
11791202
partitionLengthArr,
1180-
rawPartitionLengthArr);
1203+
rawPartitionLengthArr,
1204+
customMetricsKeyArr,
1205+
customMetricsValueArr);
11811206

11821207
return splitResult;
11831208
JNI_METHOD_END(nullptr)

cpp/core/shuffle/Options.h

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@
2424
#include <arrow/ipc/options.h>
2525
#include <arrow/util/compression.h>
2626

27+
#include <string>
28+
#include <unordered_map>
29+
2730
namespace gluten {
2831

2932
static constexpr int16_t kDefaultBatchSize = 4096;
@@ -234,5 +237,20 @@ struct ShuffleWriterMetrics {
234237
int64_t dictionarySize{0};
235238
std::vector<int64_t> partitionLengths{};
236239
std::vector<int64_t> rawPartitionLengths{}; // Uncompressed size.
240+
241+
// Backend-specific shuffle writer metrics that don't justify a dedicated
242+
// scalar field on this struct (see CONTRIBUTING note in #12107 / #12108 /
243+
// #12109 — Velox backend uses this for per-stage timer breakdowns + input
244+
// encoding mix; ClickHouse / GPU / RSS backends are free to populate
245+
// whatever scalar counters they want under their own namespaced keys).
246+
//
247+
// Convention for keys: "<Backend>.<Family>.<Stat>" — e.g.
248+
// "Velox.InputEncoding.Flat" / "Velox.SplitRV.FixedWidthWallNanos".
249+
// Spark-side surfacing (registration as SQLMetrics, naming, accumulation
250+
// across tasks) happens per-key in `VeloxMetricsApi` / `CHMetricsApi`;
251+
// unknown keys are silently dropped so a backend can ship new metrics
252+
// ahead of the Spark-side registration without breaking older Spark
253+
// versions of Gluten.
254+
std::unordered_map<std::string, int64_t> customMetrics{};
237255
};
238256
} // namespace gluten

cpp/core/shuffle/ShuffleWriter.cc

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,10 @@ const std::vector<int64_t>& ShuffleWriter::rawPartitionLengths() const {
109109
return metrics_.rawPartitionLengths;
110110
}
111111

112+
const std::unordered_map<std::string, int64_t>& ShuffleWriter::customMetrics() const {
113+
return metrics_.customMetrics;
114+
}
115+
112116
ShuffleWriter::ShuffleWriter(int32_t numPartitions, Partitioning partitioning)
113117
: numPartitions_(numPartitions), partitioning_(partitioning) {}
114118
} // namespace gluten

cpp/core/shuffle/ShuffleWriter.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,10 @@ class ShuffleWriter : public Reclaimable {
6767

6868
const std::vector<int64_t>& rawPartitionLengths() const;
6969

70+
// Backend-specific shuffle writer metrics. See `ShuffleWriterMetrics`
71+
// declaration in `Options.h` for the key-naming convention.
72+
const std::unordered_map<std::string, int64_t>& customMetrics() const;
73+
7074
protected:
7175
ShuffleWriter(int32_t numPartitions, Partitioning partitioning);
7276

gluten-arrow/src/main/java/org/apache/gluten/vectorized/GlutenSplitResult.java

Lines changed: 46 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,10 @@
1616
*/
1717
package org.apache.gluten.vectorized;
1818

19+
import java.util.Collections;
20+
import java.util.LinkedHashMap;
21+
import java.util.Map;
22+
1923
public class GlutenSplitResult {
2024
private final long totalComputePidTime;
2125
private final long totalWriteTime;
@@ -32,6 +36,15 @@ public class GlutenSplitResult {
3236
private final double avgDictionaryFields;
3337
private final long dictionarySize;
3438

39+
// Backend-specific shuffle writer metrics. Marshalled across JNI as two
40+
// parallel arrays (keys + values) to keep the JNI call cheap; reassembled
41+
// here into a Map on first access. See `ShuffleWriterMetrics::customMetrics`
42+
// in `cpp/core/shuffle/Options.h` for the key-naming convention
43+
// (`<Backend>.<Family>.<Stat>`).
44+
private final String[] customMetricsKeys;
45+
private final long[] customMetricsValues;
46+
private volatile Map<String, Long> customMetricsCache;
47+
3548
public GlutenSplitResult(
3649
long totalComputePidTime,
3750
long totalWriteTime,
@@ -46,7 +59,9 @@ public GlutenSplitResult(
4659
double avgDictionaryFields,
4760
long dictionarySize,
4861
long[] partitionLengths,
49-
long[] rawPartitionLengths) {
62+
long[] rawPartitionLengths,
63+
String[] customMetricsKeys,
64+
long[] customMetricsValues) {
5065
this.totalComputePidTime = totalComputePidTime;
5166
this.totalWriteTime = totalWriteTime;
5267
this.totalEvictTime = totalEvictTime;
@@ -61,6 +76,8 @@ public GlutenSplitResult(
6176
this.c2rTime = totalC2RTime;
6277
this.avgDictionaryFields = avgDictionaryFields;
6378
this.dictionarySize = dictionarySize;
79+
this.customMetricsKeys = customMetricsKeys;
80+
this.customMetricsValues = customMetricsValues;
6481
}
6582

6683
public long getTotalComputePidTime() {
@@ -122,4 +139,32 @@ public double getAvgDictionaryFields() {
122139
public long getDictionarySize() {
123140
return dictionarySize;
124141
}
142+
143+
/**
144+
* Backend-specific shuffle writer metrics, keyed by `<Backend>.<Family>.<Stat>`. The map
145+
* preserves the iteration order JNI marshalled, but callers should treat the map as unordered.
146+
* Returns an empty map if the native side did not populate any custom metrics (e.g. older Gluten
147+
* libs, or backends that don't yet emit any). The returned map is unmodifiable.
148+
*/
149+
public Map<String, Long> getCustomMetrics() {
150+
Map<String, Long> cached = customMetricsCache;
151+
if (cached != null) {
152+
return cached;
153+
}
154+
synchronized (this) {
155+
if (customMetricsCache != null) {
156+
return customMetricsCache;
157+
}
158+
if (customMetricsKeys == null || customMetricsKeys.length == 0) {
159+
customMetricsCache = Collections.emptyMap();
160+
} else {
161+
LinkedHashMap<String, Long> map = new LinkedHashMap<>(customMetricsKeys.length);
162+
for (int i = 0; i < customMetricsKeys.length; i++) {
163+
map.put(customMetricsKeys[i], customMetricsValues[i]);
164+
}
165+
customMetricsCache = Collections.unmodifiableMap(map);
166+
}
167+
return customMetricsCache;
168+
}
169+
}
125170
}
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.gluten.vectorized
18+
19+
import org.apache.spark.SparkFunSuite
20+
21+
/**
22+
* Unit tests for [[GlutenSplitResult]]'s `customMetrics` reassembly logic. The marshalled form
23+
* across JNI is two parallel arrays (keys + values); the POJO reassembles them lazily on first
24+
* `getCustomMetrics()` access and caches an unmodifiable map. These tests exercise the JVM-side
25+
* boundary without needing a Spark / native-library round-trip.
26+
*/
27+
class GlutenSplitResultSuite extends SparkFunSuite {
28+
29+
private def newResult(keys: Array[String], values: Array[Long]): GlutenSplitResult = {
30+
new GlutenSplitResult(
31+
0L,
32+
0L,
33+
0L,
34+
0L,
35+
0L,
36+
0L,
37+
0L,
38+
0L,
39+
0L,
40+
0L,
41+
0.0d,
42+
0L,
43+
Array.empty[Long],
44+
Array.empty[Long],
45+
keys,
46+
values)
47+
}
48+
49+
test("getCustomMetrics returns empty map when native side passed no entries") {
50+
val r = newResult(Array.empty[String], Array.empty[Long])
51+
val m = r.getCustomMetrics
52+
assert(m.isEmpty)
53+
}
54+
55+
test("getCustomMetrics returns empty map when native side passed null arrays") {
56+
val r = newResult(null, null)
57+
val m = r.getCustomMetrics
58+
assert(m.isEmpty)
59+
}
60+
61+
test("getCustomMetrics reassembles the parallel-array form into a Map") {
62+
val keys = Array("Velox.InputEncoding.Flat", "Velox.InputEncoding.Dictionary")
63+
val values = Array(123L, 7L)
64+
val r = newResult(keys, values)
65+
val m = r.getCustomMetrics
66+
assert(m.size() == 2)
67+
assert(m.get("Velox.InputEncoding.Flat") == 123L)
68+
assert(m.get("Velox.InputEncoding.Dictionary") == 7L)
69+
}
70+
71+
test("getCustomMetrics caches the reassembled map across calls") {
72+
val r = newResult(Array("k"), Array(1L))
73+
val first = r.getCustomMetrics
74+
val second = r.getCustomMetrics
75+
// Same identity: cached result is returned on subsequent calls.
76+
assert(first eq second)
77+
}
78+
79+
test("getCustomMetrics returns an unmodifiable map") {
80+
val r = newResult(Array("k"), Array(1L))
81+
val m = r.getCustomMetrics
82+
intercept[UnsupportedOperationException] {
83+
m.put("x", 2L)
84+
}
85+
}
86+
87+
test("getCustomMetrics is null-safe for code reading from older Gluten libs") {
88+
// Older Gluten native builds may construct GlutenSplitResult without
89+
// the customMetrics arrays at all — simulate by passing null/null. We
90+
// already covered that, but assert here that the *empty* map is
91+
// distinguishable from a populated one.
92+
val empty = newResult(null, null).getCustomMetrics
93+
val populated = newResult(Array("a"), Array(1L)).getCustomMetrics
94+
assert(empty.isEmpty)
95+
assert(!populated.isEmpty)
96+
}
97+
}

0 commit comments

Comments
 (0)