Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -740,30 +740,32 @@ class ColumnarCachedBatchSerializer extends SimpleMetricsCachedBatchSerializer
if heavy batch is encountered */
batch => VeloxColumnarBatches.ensureVeloxBatch(batch)
}
// Hoist the per-partition StructType out of the per-batch hot path: schema is constant
// for the lifetime of this iterator, so allocating one StructType per CachedBatch wastes
// GC for the many-small-batch case.
// Hoist per-partition-iterator constants out of the per-batch hot path:
// schema, backend name, partition-stats conf, and the JNI wrapper are all
// fixed for the lifetime of this iterator. Allocating them per CachedBatch
// wastes GC in the many-small-batch case; GlutenConfig.get in particular
// allocates a fresh GlutenConfig(SQLConf.get) on every call.
val structSchema = StructType(
schema.map(a => StructField(a.name, a.dataType, a.nullable)))
val backendName = BackendsApiManager.getBackendName
val partitionStatsEnabled =
GlutenConfig.get.getConf(GlutenConfig.COLUMNAR_TABLE_CACHE_PARTITION_STATS_ENABLED)
val jni = ColumnarBatchSerializerJniWrapper.create(
Runtimes.contextInstance(
backendName,
"ColumnarCachedBatchSerializer#serialize"))
new Iterator[CachedBatch] {
override def hasNext: Boolean = veloxBatches.hasNext

override def next(): CachedBatch = {
val batch = veloxBatches.next()
val jni = ColumnarBatchSerializerJniWrapper.create(
Runtimes.contextInstance(
BackendsApiManager.getBackendName,
"ColumnarCachedBatchSerializer#serialize"))
val handle =
ColumnarBatches.getNativeHandle(BackendsApiManager.getBackendName, batch)
val handle = ColumnarBatches.getNativeHandle(backendName, batch)
// Route through serializeWithStats when the partition-stats conf is enabled and the
// JNI extension is linked in libgluten.so. Capability is detected lazily at the
// call site: a new Gluten jar paired with an older native library will throw
// UnsatisfiedLinkError on the first invocation; we catch it once, cache the
// result, and fall back to the legacy serialize() path emitting stats=null. The
// buildFilter wrapper directs such batches through without pruning.
val partitionStatsEnabled =
GlutenConfig.get.getConf(GlutenConfig.COLUMNAR_TABLE_CACHE_PARTITION_STATS_ENABLED)
if (partitionStatsEnabled && ColumnarCachedBatchSerializer.statsExtAvailable) {
try {
val framed = jni.serializeWithStats(handle)
Expand Down
Loading