Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 27 additions & 2 deletions cpp/core/jni/JniWrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,8 @@ jint JNI_OnLoad(JavaVM* vm, void* reserved) {
jniByteInputStreamClose = getMethodIdOrError(env, jniByteInputStreamClass, "close", "()V");

splitResultClass = createGlobalClassReferenceOrError(env, "Lorg/apache/gluten/vectorized/GlutenSplitResult;");
splitResultConstructor = getMethodIdOrError(env, splitResultClass, "<init>", "(JJJJJJJJJJDJ[J[J)V");
splitResultConstructor =
getMethodIdOrError(env, splitResultClass, "<init>", "(JJJJJJJJJJDJ[J[J[Ljava/lang/String;[J)V");

metricsBuilderClass = createGlobalClassReferenceOrError(env, "Lorg/apache/gluten/metrics/Metrics;");

Expand Down Expand Up @@ -1161,6 +1162,28 @@ JNIEXPORT jobject JNICALL Java_org_apache_gluten_vectorized_ShuffleWriterJniWrap
auto rawSrc = reinterpret_cast<const jlong*>(rawPartitionLengths.data());
env->SetLongArrayRegion(rawPartitionLengthArr, 0, rawPartitionLengths.size(), rawSrc);

// Marshal backend-specific custom metrics as two parallel arrays. Keeping
// them parallel (rather than constructing a Java HashMap from JNI) keeps
// the JNI call cheap; the Java POJO reconstructs the map lazily on the
// first `getCustomMetrics()` call.
const auto& customMetrics = shuffleWriter->customMetrics();
auto customMetricsKeyArr = env->NewObjectArray(customMetrics.size(), env->FindClass("java/lang/String"), nullptr);
auto customMetricsValueArr = env->NewLongArray(customMetrics.size());
{
std::vector<jlong> customMetricsValues;
customMetricsValues.reserve(customMetrics.size());
jsize idx = 0;
for (const auto& kv : customMetrics) {
jstring keyStr = env->NewStringUTF(kv.first.c_str());
env->SetObjectArrayElement(customMetricsKeyArr, idx++, keyStr);
env->DeleteLocalRef(keyStr);
customMetricsValues.push_back(static_cast<jlong>(kv.second));
}
if (!customMetricsValues.empty()) {
env->SetLongArrayRegion(customMetricsValueArr, 0, customMetricsValues.size(), customMetricsValues.data());
}
}

jobject splitResult = env->NewObject(
splitResultClass,
splitResultConstructor,
Expand All @@ -1177,7 +1200,9 @@ JNIEXPORT jobject JNICALL Java_org_apache_gluten_vectorized_ShuffleWriterJniWrap
shuffleWriter->avgDictionaryFields(),
shuffleWriter->dictionarySize(),
partitionLengthArr,
rawPartitionLengthArr);
rawPartitionLengthArr,
customMetricsKeyArr,
customMetricsValueArr);

return splitResult;
JNI_METHOD_END(nullptr)
Expand Down
18 changes: 18 additions & 0 deletions cpp/core/shuffle/Options.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@
#include <arrow/ipc/options.h>
#include <arrow/util/compression.h>

#include <string>
#include <unordered_map>

namespace gluten {

static constexpr int16_t kDefaultBatchSize = 4096;
Expand Down Expand Up @@ -234,5 +237,20 @@ struct ShuffleWriterMetrics {
int64_t dictionarySize{0};
std::vector<int64_t> partitionLengths{};
std::vector<int64_t> rawPartitionLengths{}; // Uncompressed size.

// Backend-specific shuffle writer metrics that don't justify a dedicated
// scalar field on this struct (see CONTRIBUTING note in #12107 / #12108 /
// #12109 — Velox backend uses this for per-stage timer breakdowns + input
// encoding mix; ClickHouse / GPU / RSS backends are free to populate
// whatever scalar counters they want under their own namespaced keys).
//
// Convention for keys: "<Backend>.<Family>.<Stat>" — e.g.
// "Velox.InputEncoding.Flat" / "Velox.SplitRV.FixedWidthWallNanos".
// Spark-side surfacing (registration as SQLMetrics, naming, accumulation
// across tasks) happens per-key in `VeloxMetricsApi` / `CHMetricsApi`;
// unknown keys are silently dropped so a backend can ship new metrics
// ahead of the Spark-side registration without breaking older Spark
// versions of Gluten.
std::unordered_map<std::string, int64_t> customMetrics{};
};
} // namespace gluten
4 changes: 4 additions & 0 deletions cpp/core/shuffle/ShuffleWriter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,10 @@ const std::vector<int64_t>& ShuffleWriter::rawPartitionLengths() const {
return metrics_.rawPartitionLengths;
}

const std::unordered_map<std::string, int64_t>& ShuffleWriter::customMetrics() const {
return metrics_.customMetrics;
}

ShuffleWriter::ShuffleWriter(int32_t numPartitions, Partitioning partitioning)
: numPartitions_(numPartitions), partitioning_(partitioning) {}
} // namespace gluten
4 changes: 4 additions & 0 deletions cpp/core/shuffle/ShuffleWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ class ShuffleWriter : public Reclaimable {

const std::vector<int64_t>& rawPartitionLengths() const;

// Backend-specific shuffle writer metrics. See `ShuffleWriterMetrics`
// declaration in `Options.h` for the key-naming convention.
const std::unordered_map<std::string, int64_t>& customMetrics() const;

protected:
ShuffleWriter(int32_t numPartitions, Partitioning partitioning);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@
*/
package org.apache.gluten.vectorized;

import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;

public class GlutenSplitResult {
private final long totalComputePidTime;
private final long totalWriteTime;
Expand All @@ -32,6 +36,15 @@ public class GlutenSplitResult {
private final double avgDictionaryFields;
private final long dictionarySize;

// Backend-specific shuffle writer metrics. Marshalled across JNI as two
// parallel arrays (keys + values) to keep the JNI call cheap; reassembled
// here into a Map on first access. See `ShuffleWriterMetrics::customMetrics`
// in `cpp/core/shuffle/Options.h` for the key-naming convention
// (`<Backend>.<Family>.<Stat>`).
private final String[] customMetricsKeys;
private final long[] customMetricsValues;
private volatile Map<String, Long> customMetricsCache;

public GlutenSplitResult(
long totalComputePidTime,
long totalWriteTime,
Expand All @@ -46,7 +59,9 @@ public GlutenSplitResult(
double avgDictionaryFields,
long dictionarySize,
long[] partitionLengths,
long[] rawPartitionLengths) {
long[] rawPartitionLengths,
String[] customMetricsKeys,
long[] customMetricsValues) {
this.totalComputePidTime = totalComputePidTime;
this.totalWriteTime = totalWriteTime;
this.totalEvictTime = totalEvictTime;
Expand All @@ -61,6 +76,8 @@ public GlutenSplitResult(
this.c2rTime = totalC2RTime;
this.avgDictionaryFields = avgDictionaryFields;
this.dictionarySize = dictionarySize;
this.customMetricsKeys = customMetricsKeys;
this.customMetricsValues = customMetricsValues;
}

public long getTotalComputePidTime() {
Expand Down Expand Up @@ -122,4 +139,45 @@ public double getAvgDictionaryFields() {
public long getDictionarySize() {
return dictionarySize;
}

/**
* Backend-specific shuffle writer metrics, keyed by `<Backend>.<Family>.<Stat>`. The map
* preserves the iteration order JNI marshalled, but callers should treat the map as unordered.
* Returns an empty map if the native side did not populate any custom metrics. The returned map
* is unmodifiable.
*/
public Map<String, Long> getCustomMetrics() {
Map<String, Long> cached = customMetricsCache;
if (cached != null) {
return cached;
}
synchronized (this) {
if (customMetricsCache != null) {
return customMetricsCache;
}
if (customMetricsKeys == null || customMetricsKeys.length == 0) {
Comment thread
yaooqinn marked this conversation as resolved.
customMetricsCache = Collections.emptyMap();
} else {
// Defensive check: if a future native-side producer ever ships
// mismatched arrays, fail loudly here (before the cache field is
// assigned to a partial result). Without this, the AIOOBE inside the
// loop would leave `customMetricsCache` null and every subsequent
// `getCustomMetrics()` call would re-enter the synchronized block
// and re-throw — a confusing failure mode.
if (customMetricsValues == null || customMetricsValues.length != customMetricsKeys.length) {
throw new IllegalStateException(
"customMetricsKeys / customMetricsValues length mismatch: "
+ customMetricsKeys.length
+ " vs "
+ (customMetricsValues == null ? "null" : customMetricsValues.length));
}
LinkedHashMap<String, Long> map = new LinkedHashMap<>(customMetricsKeys.length);
for (int i = 0; i < customMetricsKeys.length; i++) {
map.put(customMetricsKeys[i], customMetricsValues[i]);
}
customMetricsCache = Collections.unmodifiableMap(map);
}
return customMetricsCache;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gluten.vectorized

import org.apache.spark.SparkFunSuite

/**
* Unit tests for [[GlutenSplitResult]]'s `customMetrics` reassembly logic. The marshalled form
* across JNI is two parallel arrays (keys + values); the POJO reassembles them lazily on first
* `getCustomMetrics()` access and caches an unmodifiable map. These tests exercise the JVM-side
* boundary without needing a Spark / native-library round-trip.
*/
class GlutenSplitResultSuite extends SparkFunSuite {

private def newResult(keys: Array[String], values: Array[Long]): GlutenSplitResult = {
new GlutenSplitResult(
0L,
0L,
0L,
0L,
0L,
0L,
0L,
0L,
0L,
0L,
0.0d,
0L,
Array.empty[Long],
Array.empty[Long],
keys,
values)
}

test("getCustomMetrics returns empty map when native side passed no entries") {
val r = newResult(Array.empty[String], Array.empty[Long])
val m = r.getCustomMetrics
assert(m.isEmpty)
}

test("getCustomMetrics returns empty map when native side passed null arrays") {
val r = newResult(null, null)
val m = r.getCustomMetrics
assert(m.isEmpty)
}

test("getCustomMetrics reassembles the parallel-array form into a Map") {
val keys = Array("Velox.InputEncoding.Flat", "Velox.InputEncoding.Dictionary")
val values = Array(123L, 7L)
val r = newResult(keys, values)
val m = r.getCustomMetrics
assert(m.size() == 2)
assert(m.get("Velox.InputEncoding.Flat") == 123L)
assert(m.get("Velox.InputEncoding.Dictionary") == 7L)
}

test("getCustomMetrics caches the reassembled map across calls") {
val r = newResult(Array("k"), Array(1L))
val first = r.getCustomMetrics
val second = r.getCustomMetrics
// Same identity: cached result is returned on subsequent calls.
assert(first eq second)
}

test("getCustomMetrics returns an unmodifiable map") {
val r = newResult(Array("k"), Array(1L))
val m = r.getCustomMetrics
intercept[UnsupportedOperationException] {
m.put("x", 2L)
}
}

test("getCustomMetrics is null-safe for code reading from older Gluten libs") {
// Older Gluten native builds may construct GlutenSplitResult without
// the customMetrics arrays at all — simulate by passing null/null. We
// already covered that, but assert here that the *empty* map is
// distinguishable from a populated one.
val empty = newResult(null, null).getCustomMetrics
val populated = newResult(Array("a"), Array(1L)).getCustomMetrics
assert(empty.isEmpty)
assert(!populated.isEmpty)
}

test("getCustomMetrics fails loudly on mismatched key/value array lengths") {
// A future native-side producer that ships mismatched arrays must not
// silently corrupt the metrics map (and must not leave the lazy cache
// field unassigned so subsequent calls re-enter and re-throw). Assert
// that the first call throws IllegalStateException with both lengths
// mentioned, and that a second call still throws (the cache field is
// never assigned to a partial map).
val r = newResult(Array("a", "b"), Array(1L))
val ex = intercept[IllegalStateException](r.getCustomMetrics)
assert(ex.getMessage.contains("2") && ex.getMessage.contains("1"))
intercept[IllegalStateException](r.getCustomMetrics)
}

test("getCustomMetrics fails loudly when values array is null but keys is non-empty") {
val r = newResult(Array("a"), null)
val ex = intercept[IllegalStateException](r.getCustomMetrics)
assert(ex.getMessage.contains("null"))
}
}
Loading