From 86f6eb6b8fc28bda52fc2aa99a71273ba49801b8 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 22 Jan 2026 12:24:25 -0700 Subject: [PATCH 1/2] perf: reduce GC pressure in protobuf serialization Replace ByteArrayOutputStream with direct CodedOutputStream serialization to eliminate unnecessary allocations during query plan serialization. This optimization: - Pre-allocates exact buffer size using getSerializedSize() - Eliminates ByteArrayOutputStream's internal buffer resizing - Removes defensive array copying from toByteArray() - Applies to 5 hot paths called per-partition during query execution For a query with 1000 partitions, this eliminates 5000+ unnecessary allocations and array copies, significantly reducing GC pressure. Changes: - operators.scala: getCometIterator() and convertBlock() - CometNativeWriteExec.scala: serializedPlanOpt() and doExecute() - ParquetFilters.scala: createNativeFilters() --- .../apache/comet/parquet/ParquetFilters.scala | 12 +++++++---- .../sql/comet/CometNativeWriteExec.scala | 21 ++++++++++++------- .../apache/spark/sql/comet/operators.scala | 20 +++++++++++------- 3 files changed, 33 insertions(+), 20 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/parquet/ParquetFilters.scala b/spark/src/main/scala/org/apache/comet/parquet/ParquetFilters.scala index dbc3e17f83..5e63199e41 100644 --- a/spark/src/main/scala/org/apache/comet/parquet/ParquetFilters.scala +++ b/spark/src/main/scala/org/apache/comet/parquet/ParquetFilters.scala @@ -43,6 +43,8 @@ import org.apache.spark.sql.sources import org.apache.spark.sql.types.StructType import org.apache.spark.unsafe.types.UTF8String +import com.google.protobuf.CodedOutputStream + import org.apache.comet.parquet.SourceFilterSerde.{createBinaryExpr, createNameExpr, createUnaryExpr, createValueExpr} import org.apache.comet.serde.ExprOuterClass import org.apache.comet.serde.QueryPlanSerde.scalarFunctionExprToProto @@ -885,10 +887,12 @@ class ParquetFilters( def createNativeFilters(predicates: Seq[sources.Filter]): Option[Array[Byte]] = { predicates.reduceOption(sources.And).flatMap(createNativeFilter).map { expr => - val outputStream = new ByteArrayOutputStream() - expr.writeTo(outputStream) - outputStream.close() - outputStream.toByteArray + val size = expr.getSerializedSize + val bytes = new Array[Byte](size) + val codedOutput = CodedOutputStream.newInstance(bytes) + expr.writeTo(codedOutput) + codedOutput.checkNoSpaceLeft() + bytes } } diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeWriteExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeWriteExec.scala index f153a691ef..04625b2b04 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeWriteExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeWriteExec.scala @@ -34,6 +34,8 @@ import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} import org.apache.spark.sql.vectorized.ColumnarBatch import org.apache.spark.util.Utils +import com.google.protobuf.CodedOutputStream + import org.apache.comet.CometExecIterator import org.apache.comet.serde.OperatorOuterClass.Operator @@ -75,10 +77,12 @@ case class CometNativeWriteExec( sparkContext.collectionAccumulator[FileCommitProtocol.TaskCommitMessage]("taskCommitMessages") override def serializedPlanOpt: SerializedPlan = { - val outputStream = new ByteArrayOutputStream() - nativeOp.writeTo(outputStream) - outputStream.close() - SerializedPlan(Some(outputStream.toByteArray)) + val size = nativeOp.getSerializedSize + val bytes = new Array[Byte](size) + val codedOutput = CodedOutputStream.newInstance(bytes) + nativeOp.writeTo(codedOutput) + codedOutput.checkNoSpaceLeft() + SerializedPlan(Some(bytes)) } override def withNewChildInternal(newChild: SparkPlan): SparkPlan = @@ -196,10 +200,11 @@ case class CometNativeWriteExec( val nativeMetrics = CometMetricNode.fromCometPlan(this) - val outputStream = new ByteArrayOutputStream() - modifiedNativeOp.writeTo(outputStream) - outputStream.close() - val planBytes = outputStream.toByteArray + val size = modifiedNativeOp.getSerializedSize + val planBytes = new Array[Byte](size) + val codedOutput = CodedOutputStream.newInstance(planBytes) + modifiedNativeOp.writeTo(codedOutput) + codedOutput.checkNoSpaceLeft() val execIterator = new CometExecIterator( CometExec.newIterId, diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala index 0a435e5b7a..a19449a7ab 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala @@ -50,6 +50,7 @@ import org.apache.spark.util.SerializableConfiguration import org.apache.spark.util.io.ChunkedByteBuffer import com.google.common.base.Objects +import com.google.protobuf.CodedOutputStream import org.apache.comet.{CometConf, CometExecIterator, CometRuntimeException, ConfigEntry} import org.apache.comet.CometSparkSessionExtensions.{isCometShuffleEnabled, withInfo} @@ -139,10 +140,11 @@ object CometExec { partitionIdx: Int, broadcastedHadoopConfForEncryption: Option[Broadcast[SerializableConfiguration]], encryptedFilePaths: Seq[String]): CometExecIterator = { - val outputStream = new ByteArrayOutputStream() - nativePlan.writeTo(outputStream) - outputStream.close() - val bytes = outputStream.toByteArray + val size = nativePlan.getSerializedSize + val bytes = new Array[Byte](size) + val codedOutput = CodedOutputStream.newInstance(bytes) + nativePlan.writeTo(codedOutput) + codedOutput.checkNoSpaceLeft() new CometExecIterator( newIterId, inputs, @@ -414,10 +416,12 @@ abstract class CometNativeExec extends CometExec { def convertBlock(): CometNativeExec = { def transform(arg: Any): AnyRef = arg match { case serializedPlan: SerializedPlan if serializedPlan.isEmpty => - val out = new ByteArrayOutputStream() - nativeOp.writeTo(out) - out.close() - SerializedPlan(Some(out.toByteArray)) + val size = nativeOp.getSerializedSize + val bytes = new Array[Byte](size) + val codedOutput = CodedOutputStream.newInstance(bytes) + nativeOp.writeTo(codedOutput) + codedOutput.checkNoSpaceLeft() + SerializedPlan(Some(bytes)) case other: AnyRef => other case null => null } From 0a6f97d73c98961a57efd3807544ae3342dba11b Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 22 Jan 2026 13:16:27 -0700 Subject: [PATCH 2/2] fix --- .../main/scala/org/apache/comet/parquet/ParquetFilters.scala | 1 - .../scala/org/apache/spark/sql/comet/CometNativeWriteExec.scala | 2 -- spark/src/main/scala/org/apache/spark/sql/comet/operators.scala | 1 - 3 files changed, 4 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/parquet/ParquetFilters.scala b/spark/src/main/scala/org/apache/comet/parquet/ParquetFilters.scala index 5e63199e41..f8da68d59f 100644 --- a/spark/src/main/scala/org/apache/comet/parquet/ParquetFilters.scala +++ b/spark/src/main/scala/org/apache/comet/parquet/ParquetFilters.scala @@ -19,7 +19,6 @@ package org.apache.comet.parquet -import java.io.ByteArrayOutputStream import java.lang.{Boolean => JBoolean, Byte => JByte, Double => JDouble, Float => JFloat, Long => JLong, Short => JShort} import java.math.{BigDecimal => JBigDecimal} import java.sql.{Date, Timestamp} diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeWriteExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeWriteExec.scala index 04625b2b04..39e7ac6eef 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeWriteExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometNativeWriteExec.scala @@ -19,8 +19,6 @@ package org.apache.spark.sql.comet -import java.io.ByteArrayOutputStream - import scala.jdk.CollectionConverters._ import org.apache.hadoop.fs.Path diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala index a19449a7ab..f4f97b8312 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala @@ -19,7 +19,6 @@ package org.apache.spark.sql.comet -import java.io.ByteArrayOutputStream import java.util.Locale import scala.collection.mutable