Skip to content

Commit 62a68ac

Browse files
authored
feat: Implement metrics for iceberg compat (apache#2615)
1 parent d88a8f5 commit 62a68ac

15 files changed

Lines changed: 205 additions & 191 deletions

File tree

common/src/main/java/org/apache/comet/parquet/Native.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -268,7 +268,8 @@ public static native long initRecordBatchReader(
268268
int batchSize,
269269
boolean caseSensitive,
270270
Map<String, String> objectStoreOptions,
271-
CometFileKeyUnwrapper keyUnwrapper);
271+
CometFileKeyUnwrapper keyUnwrapper,
272+
Object metricsNode);
272273

273274
// arrow native version of read batch
274275

common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,9 @@ public class NativeBatchReader extends RecordReader<Void, ColumnarBatch> impleme
114114
private InternalRow partitionValues;
115115
private PartitionedFile file;
116116
private final Map<String, SQLMetric> metrics;
117+
// Unfortunately CometMetricNode is from the "spark" package and cannot be used directly here
118+
// TODO: Move it to common package?
119+
private Object metricsNode = null;
117120

118121
private StructType sparkSchema;
119122
private StructType dataSchema;
@@ -214,7 +217,8 @@ private NativeBatchReader(AbstractColumnReader[] columnReaders) {
214217
boolean useLegacyDateTimestamp,
215218
StructType partitionSchema,
216219
InternalRow partitionValues,
217-
Map<String, SQLMetric> metrics) {
220+
Map<String, SQLMetric> metrics,
221+
Object metricsNode) {
218222
this.conf = conf;
219223
this.capacity = capacity;
220224
this.sparkSchema = sparkSchema;
@@ -229,6 +233,7 @@ private NativeBatchReader(AbstractColumnReader[] columnReaders) {
229233
this.footer = footer;
230234
this.nativeFilter = nativeFilter;
231235
this.metrics = metrics;
236+
this.metricsNode = metricsNode;
232237
this.taskContext = TaskContext$.MODULE$.get();
233238
}
234239

@@ -436,7 +441,8 @@ public void init() throws Throwable {
436441
batchSize,
437442
caseSensitive,
438443
objectStoreOptions,
439-
keyUnwrapper);
444+
keyUnwrapper,
445+
metricsNode);
440446
}
441447
isInitialized = true;
442448
}

native/core/src/execution/metrics/utils.rs

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -27,20 +27,24 @@ use std::sync::Arc;
2727
/// Updates the metrics of a CometMetricNode. This function is called recursively to
2828
/// update the metrics of all the children nodes. The metrics are pulled from the
2929
/// native execution plan and pushed to the Java side through JNI.
30-
pub fn update_comet_metric(
30+
pub(crate) fn update_comet_metric(
3131
env: &mut JNIEnv,
3232
metric_node: &JObject,
3333
spark_plan: &Arc<SparkPlan>,
3434
) -> Result<(), CometError> {
35-
unsafe {
36-
let native_metric = to_native_metric_node(spark_plan);
37-
let jbytes = env.byte_array_from_slice(&native_metric?.encode_to_vec())?;
38-
jni_call!(env, comet_metric_node(metric_node).set_all_from_bytes(&jbytes) -> ())?;
35+
if metric_node.is_null() {
36+
return Ok(());
3937
}
40-
Ok(())
38+
39+
let native_metric = to_native_metric_node(spark_plan);
40+
let jbytes = env.byte_array_from_slice(&native_metric?.encode_to_vec())?;
41+
42+
unsafe { jni_call!(env, comet_metric_node(metric_node).set_all_from_bytes(&jbytes) -> ()) }
4143
}
4244

43-
pub fn to_native_metric_node(spark_plan: &Arc<SparkPlan>) -> Result<NativeMetricNode, CometError> {
45+
pub(crate) fn to_native_metric_node(
46+
spark_plan: &Arc<SparkPlan>,
47+
) -> Result<NativeMetricNode, CometError> {
4448
let mut native_metric_node = NativeMetricNode {
4549
metrics: HashMap::new(),
4650
children: Vec::new(),

native/core/src/execution/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
//! PoC of vectorization execution through JNI to Rust.
1919
pub mod expressions;
2020
pub mod jni_api;
21-
mod metrics;
21+
pub(crate) mod metrics;
2222
pub mod operators;
2323
pub(crate) mod planner;
2424
pub mod serde;

native/core/src/parquet/mod.rs

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,9 +46,11 @@ use jni::{
4646

4747
use self::util::jni::TypePromotionInfo;
4848
use crate::execution::jni_api::get_runtime;
49+
use crate::execution::metrics::utils::update_comet_metric;
4950
use crate::execution::operators::ExecutionError;
5051
use crate::execution::planner::PhysicalPlanner;
5152
use crate::execution::serde;
53+
use crate::execution::spark_plan::SparkPlan;
5254
use crate::execution::utils::SparkArrowConvert;
5355
use crate::jvm_bridge::{jni_new_global_ref, JVMClasses};
5456
use crate::parquet::data_type::AsBytes;
@@ -600,6 +602,8 @@ enum ParquetReaderState {
600602
}
601603
/// Parquet read context maintained across multiple JNI calls.
602604
struct BatchContext {
605+
native_plan: Arc<SparkPlan>,
606+
metrics_node: Arc<GlobalRef>,
603607
batch_stream: Option<SendableRecordBatchStream>,
604608
current_batch: Option<RecordBatch>,
605609
reader_state: ParquetReaderState,
@@ -697,6 +701,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_parquet_Native_initRecordBat
697701
case_sensitive: jboolean,
698702
object_store_options: JObject,
699703
key_unwrapper_obj: JObject,
704+
metrics_node: JObject,
700705
) -> jlong {
701706
try_unwrap_or_throw(&e, |mut env| unsafe {
702707
JVMClasses::init(&mut env);
@@ -776,6 +781,8 @@ pub unsafe extern "system" fn Java_org_apache_comet_parquet_Native_initRecordBat
776781
let batch_stream = Some(scan.execute(partition_index, session_ctx.task_ctx())?);
777782

778783
let ctx = BatchContext {
784+
native_plan: Arc::new(SparkPlan::new(0, scan, vec![])),
785+
metrics_node: Arc::new(jni_new_global_ref!(env, metrics_node)?),
779786
batch_stream,
780787
current_batch: None,
781788
reader_state: ParquetReaderState::Init,
@@ -791,7 +798,7 @@ pub extern "system" fn Java_org_apache_comet_parquet_Native_readNextRecordBatch(
791798
_jclass: JClass,
792799
handle: jlong,
793800
) -> jint {
794-
try_unwrap_or_throw(&e, |_env| {
801+
try_unwrap_or_throw(&e, |mut env| {
795802
let context = get_batch_context(handle)?;
796803
let mut rows_read: i32 = 0;
797804
let batch_stream = context.batch_stream.as_mut().unwrap();
@@ -813,8 +820,11 @@ pub extern "system" fn Java_org_apache_comet_parquet_Native_readNextRecordBatch(
813820
Poll::Ready(None) => {
814821
// EOF
815822

816-
// TODO: (ARROW NATIVE) We can update metrics here
817-
// crate::execution::jni_api::update_metrics(&mut env, exec_context)?;
823+
update_comet_metric(
824+
&mut env,
825+
context.metrics_node.as_obj(),
826+
&context.native_plan,
827+
)?;
818828

819829
context.current_batch = None;
820830
context.reader_state = ParquetReaderState::Complete;

spark/src/main/scala/org/apache/comet/MetricsSupport.scala

Lines changed: 2 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -19,38 +19,13 @@
1919

2020
package org.apache.comet
2121

22-
import org.apache.spark.SparkContext
23-
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
22+
import org.apache.spark.sql.execution.metric.SQLMetric
2423

2524
/**
2625
* A trait for Comet operators that support SQL metrics
2726
*/
2827
trait MetricsSupport {
2928
protected var metrics: Map[String, SQLMetric] = Map.empty
3029

31-
def initMetrics(sparkContext: SparkContext): Map[String, SQLMetric] = {
32-
metrics = Map(
33-
"ParquetRowGroups" -> SQLMetrics.createMetric(
34-
sparkContext,
35-
"num of Parquet row groups read"),
36-
"ParquetNativeDecodeTime" -> SQLMetrics.createNanoTimingMetric(
37-
sparkContext,
38-
"time spent in Parquet native decoding"),
39-
"ParquetNativeLoadTime" -> SQLMetrics.createNanoTimingMetric(
40-
sparkContext,
41-
"time spent in loading Parquet native vectors"),
42-
"ParquetLoadRowGroupTime" -> SQLMetrics.createNanoTimingMetric(
43-
sparkContext,
44-
"time spent in loading Parquet row groups"),
45-
"ParquetInputFileReadTime" -> SQLMetrics.createNanoTimingMetric(
46-
sparkContext,
47-
"time spent in reading Parquet file from storage"),
48-
"ParquetInputFileReadSize" -> SQLMetrics.createSizeMetric(
49-
sparkContext,
50-
"read size when reading Parquet file from storage (MB)"),
51-
"ParquetInputFileReadThroughput" -> SQLMetrics.createAverageMetric(
52-
sparkContext,
53-
"read throughput when reading Parquet file from storage (MB/sec)"))
54-
metrics
55-
}
30+
def getMetrics: Map[String, SQLMetric] = metrics
5631
}

spark/src/main/scala/org/apache/comet/parquet/CometParquetFileFormat.scala

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import org.apache.spark.sql.SparkSession
3030
import org.apache.spark.sql.catalyst.InternalRow
3131
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
3232
import org.apache.spark.sql.catalyst.util.RebaseDateTime.RebaseSpec
33+
import org.apache.spark.sql.comet.CometMetricNode
3334
import org.apache.spark.sql.execution.datasources.DataSourceUtils
3435
import org.apache.spark.sql.execution.datasources.PartitionedFile
3536
import org.apache.spark.sql.execution.datasources.RecordReaderIterator
@@ -56,10 +57,14 @@ import org.apache.comet.vector.CometVector
5657
* in [[org.apache.comet.CometSparkSessionExtensions]]
5758
* - `buildReaderWithPartitionValues`, so Spark calls Comet's Parquet reader to read values.
5859
*/
59-
class CometParquetFileFormat(scanImpl: String)
60+
class CometParquetFileFormat(session: SparkSession, scanImpl: String)
6061
extends ParquetFileFormat
6162
with MetricsSupport
6263
with ShimSQLConf {
64+
metrics =
65+
CometMetricNode.nativeScanMetrics(session.sparkContext) ++ CometMetricNode.parquetScanMetrics(
66+
session.sparkContext)
67+
6368
override def shortName(): String = "parquet"
6469
override def toString: String = "CometParquet"
6570
override def hashCode(): Int = getClass.hashCode()
@@ -164,7 +169,8 @@ class CometParquetFileFormat(scanImpl: String)
164169
datetimeRebaseSpec.mode == CORRECTED,
165170
partitionSchema,
166171
file.partitionValues,
167-
metrics.asJava)
172+
metrics.asJava,
173+
CometMetricNode(metrics))
168174
try {
169175
batchReader.init()
170176
} catch {

spark/src/main/scala/org/apache/comet/parquet/CometParquetScan.scala

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import scala.jdk.CollectionConverters._
2323

2424
import org.apache.hadoop.conf.Configuration
2525
import org.apache.spark.sql.SparkSession
26+
import org.apache.spark.sql.comet.CometMetricNode
2627
import org.apache.spark.sql.connector.read.PartitionReaderFactory
2728
import org.apache.spark.sql.execution.datasources.parquet.ParquetOptions
2829
import org.apache.spark.sql.execution.datasources.v2.FileScan
@@ -34,6 +35,8 @@ import org.apache.spark.util.SerializableConfiguration
3435

3536
import org.apache.comet.MetricsSupport
3637

38+
// TODO: Consider creating a case class and patch SQL tests if needed, will make life easier.
39+
// currently hacking around this by setting the metrics within the object's apply method.
3740
trait CometParquetScan extends FileScan with MetricsSupport {
3841
def sparkSession: SparkSession
3942
def hadoopConf: Configuration
@@ -70,8 +73,8 @@ trait CometParquetScan extends FileScan with MetricsSupport {
7073
}
7174

7275
object CometParquetScan {
73-
def apply(scan: ParquetScan): CometParquetScan =
74-
new ParquetScan(
76+
def apply(session: SparkSession, scan: ParquetScan): CometParquetScan = {
77+
val newScan = new ParquetScan(
7578
scan.sparkSession,
7679
scan.hadoopConf,
7780
scan.fileIndex,
@@ -82,4 +85,10 @@ object CometParquetScan {
8285
scan.options,
8386
partitionFilters = scan.partitionFilters,
8487
dataFilters = scan.dataFilters) with CometParquetScan
88+
89+
newScan.metrics = CometMetricNode.nativeScanMetrics(session.sparkContext) ++ CometMetricNode
90+
.parquetScanMetrics(session.sparkContext)
91+
92+
newScan
93+
}
8594
}

spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -203,7 +203,7 @@ case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] with Com
203203
return withInfos(scanExec, fallbackReasons.toSet)
204204
}
205205

206-
if (scanImpl != CometConf.SCAN_NATIVE_COMET && encryptionEnabled(hadoopConf)) {
206+
if (encryptionEnabled(hadoopConf) && scanImpl != CometConf.SCAN_NATIVE_COMET) {
207207
if (!isEncryptionConfigSupported(hadoopConf)) {
208208
return withInfos(scanExec, fallbackReasons.toSet)
209209
}
@@ -257,7 +257,7 @@ case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] with Com
257257
}
258258

259259
if (schemaSupported && partitionSchemaSupported && scan.pushedAggregate.isEmpty) {
260-
val cometScan = CometParquetScan(scanExec.scan.asInstanceOf[ParquetScan])
260+
val cometScan = CometParquetScan(session, scanExec.scan.asInstanceOf[ParquetScan])
261261
CometBatchScanExec(
262262
scanExec.copy(scan = cometScan),
263263
runtimeFilters = scanExec.runtimeFilters)

spark/src/main/scala/org/apache/spark/sql/comet/CometBatchScanExec.scala

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -51,9 +51,13 @@ case class CometBatchScanExec(wrapped: BatchScanExec, runtimeFilters: Seq[Expres
5151
override lazy val inputRDD: RDD[InternalRow] = wrappedScan.inputRDD
5252

5353
override def doExecuteColumnar(): RDD[ColumnarBatch] = {
54+
val rdd = inputRDD.asInstanceOf[RDD[ColumnarBatch]]
55+
56+
// These metrics are important for streaming solutions.
57+
// despite there being similar metrics published by the native reader.
5458
val numOutputRows = longMetric("numOutputRows")
5559
val scanTime = longMetric("scanTime")
56-
inputRDD.asInstanceOf[RDD[ColumnarBatch]].mapPartitionsInternal { batches =>
60+
rdd.mapPartitionsInternal { batches =>
5761
new Iterator[ColumnarBatch] {
5862

5963
override def hasNext: Boolean = {
@@ -137,16 +141,12 @@ case class CometBatchScanExec(wrapped: BatchScanExec, runtimeFilters: Seq[Expres
137141
wrapped
138142
}
139143

140-
override lazy val metrics: Map[String, SQLMetric] = Map(
141-
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
142-
"scanTime" -> SQLMetrics.createNanoTimingMetric(
143-
sparkContext,
144-
"scan time")) ++ wrapped.customMetrics ++ {
145-
wrapped.scan match {
146-
case s: MetricsSupport => s.initMetrics(sparkContext)
144+
override lazy val metrics: Map[String, SQLMetric] =
145+
wrappedScan.customMetrics ++ CometMetricNode.baseScanMetrics(
146+
session.sparkContext) ++ (scan match {
147+
case s: MetricsSupport => s.getMetrics
147148
case _ => Map.empty
148-
}
149-
}
149+
})
150150

151151
@transient override lazy val partitions: Seq[Seq[InputPartition]] = wrappedScan.partitions
152152

0 commit comments

Comments
 (0)