From 446ac61e8895a7837293bb958ef0aa0ee4478ef9 Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Thu, 28 May 2026 17:09:07 +0800 Subject: [PATCH] [VL] Hoist per-partition constants out of ColumnarCachedBatchSerializer.serialize hot path In convertInternalRowToCachedBatch, three values that are constant for the lifetime of the per-partition Iterator[CachedBatch] were being re-evaluated on every next() call: 1. BackendsApiManager.getBackendName (twice per batch) 2. GlutenConfig.get.getConf(COLUMNAR_TABLE_CACHE_PARTITION_STATS_ENABLED) -- GlutenConfig.get allocates a fresh GlutenConfig(SQLConf.get) on every call (GlutenConfig.scala L584-L586) 3. ColumnarBatchSerializerJniWrapper.create(Runtimes.contextInstance(...)) Hoist all three out of next() into the mapPartitions body, alongside the structSchema value that the same block already hoists for the same many-small-batch GC-pressure reason. Only the per-batch handle remains inside next() since it depends on the batch. Wire format is byte-identical. Pure refactor with no new test file; behavior fully covered by ColumnarCachedBatchKryoSuite and ColumnarCachedBatchKryoBoundaryProbeBugSuite (7 tests, all green locally on -Pspark-4.1 -Pscala-2.13). refs: todos/features/gluten-ccbs-iterator-hoist/docs/0002-decision.md refs: todos/features/gluten-ccbs-iterator-hoist/docs/0003-implementation-plan.md --- .../ColumnarCachedBatchSerializer.scala | 24 ++++++++++--------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarCachedBatchSerializer.scala b/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarCachedBatchSerializer.scala index bb548b2f9ce..18e2d56bd1c 100644 --- a/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarCachedBatchSerializer.scala +++ b/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarCachedBatchSerializer.scala @@ -740,30 +740,32 @@ class ColumnarCachedBatchSerializer extends SimpleMetricsCachedBatchSerializer if heavy batch is encountered */ batch => VeloxColumnarBatches.ensureVeloxBatch(batch) } - // Hoist the per-partition StructType out of the per-batch hot path: schema is constant - // for the lifetime of this iterator, so allocating one StructType per CachedBatch wastes - // GC for the many-small-batch case. + // Hoist per-partition-iterator constants out of the per-batch hot path: + // schema, backend name, partition-stats conf, and the JNI wrapper are all + // fixed for the lifetime of this iterator. Allocating them per CachedBatch + // wastes GC in the many-small-batch case; GlutenConfig.get in particular + // allocates a fresh GlutenConfig(SQLConf.get) on every call. val structSchema = StructType( schema.map(a => StructField(a.name, a.dataType, a.nullable))) + val backendName = BackendsApiManager.getBackendName + val partitionStatsEnabled = + GlutenConfig.get.getConf(GlutenConfig.COLUMNAR_TABLE_CACHE_PARTITION_STATS_ENABLED) + val jni = ColumnarBatchSerializerJniWrapper.create( + Runtimes.contextInstance( + backendName, + "ColumnarCachedBatchSerializer#serialize")) new Iterator[CachedBatch] { override def hasNext: Boolean = veloxBatches.hasNext override def next(): CachedBatch = { val batch = veloxBatches.next() - val jni = ColumnarBatchSerializerJniWrapper.create( - Runtimes.contextInstance( - BackendsApiManager.getBackendName, - "ColumnarCachedBatchSerializer#serialize")) - val handle = - ColumnarBatches.getNativeHandle(BackendsApiManager.getBackendName, batch) + val handle = ColumnarBatches.getNativeHandle(backendName, batch) // Route through serializeWithStats when the partition-stats conf is enabled and the // JNI extension is linked in libgluten.so. Capability is detected lazily at the // call site: a new Gluten jar paired with an older native library will throw // UnsatisfiedLinkError on the first invocation; we catch it once, cache the // result, and fall back to the legacy serialize() path emitting stats=null. The // buildFilter wrapper directs such batches through without pruning. - val partitionStatsEnabled = - GlutenConfig.get.getConf(GlutenConfig.COLUMNAR_TABLE_CACHE_PARTITION_STATS_ENABLED) if (partitionStatsEnabled && ColumnarCachedBatchSerializer.statsExtAvailable) { try { val framed = jni.serializeWithStats(handle)