diff --git a/backends-velox/src/main/java/org/apache/gluten/vectorized/HashJoinBuilder.java b/backends-velox/src/main/java/org/apache/gluten/vectorized/HashJoinBuilder.java index 106d8b98170..28ed5514a9c 100644 --- a/backends-velox/src/main/java/org/apache/gluten/vectorized/HashJoinBuilder.java +++ b/backends-velox/src/main/java/org/apache/gluten/vectorized/HashJoinBuilder.java @@ -35,9 +35,9 @@ public long rtHandle() { return runtime.getHandle(); } - public static native void clearHashTable(long hashTableData); + public static native void clearHashTable(String cacheKey, long hashTableData); - public static native long cloneHashTable(long hashTableData); + public static native long cloneHashTable(String cacheKey, long hashTableData); public native long nativeBuild( String buildHashTableId, diff --git a/backends-velox/src/main/scala/org/apache/gluten/execution/HashJoinExecTransformer.scala b/backends-velox/src/main/scala/org/apache/gluten/execution/HashJoinExecTransformer.scala index 1554c4ddd3e..5ddb9d9a7b8 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/execution/HashJoinExecTransformer.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/execution/HashJoinExecTransformer.scala @@ -106,7 +106,7 @@ case class BroadcastHashJoinExecTransformer( isNullAwareAntiJoin) { // Unique ID for built table - lazy val buildBroadcastTableId: String = buildPlan.id.toString + lazy val buildBroadcastTableId: String = canonicalBuildHashTableId(buildPlan) override protected lazy val substraitJoinType: JoinRel.JoinType = joinType match { case _: InnerLike => diff --git a/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxBroadcastBuildSideCache.scala b/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxBroadcastBuildSideCache.scala index 535fd8900e1..c8fcd574cdd 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxBroadcastBuildSideCache.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxBroadcastBuildSideCache.scala @@ -103,7 +103,7 @@ object VeloxBroadcastBuildSideCache } } - HashJoinBuilder.clearHashTable(value.pointer) + HashJoinBuilder.clearHashTable(key, value.pointer) } } } diff --git a/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarBuildSideRelation.scala b/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarBuildSideRelation.scala index fea9f149745..f640fde9eaa 100644 --- a/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarBuildSideRelation.scala +++ b/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarBuildSideRelation.scala @@ -234,7 +234,7 @@ case class ColumnarBuildSideRelation( (hashTableData, this) } else { - (HashJoinBuilder.cloneHashTable(hashTableData), null) + (HashJoinBuilder.cloneHashTable(broadcastContext.buildHashTableId, hashTableData), null) } } diff --git a/backends-velox/src/main/scala/org/apache/spark/sql/execution/unsafe/UnsafeColumnarBuildSideRelation.scala b/backends-velox/src/main/scala/org/apache/spark/sql/execution/unsafe/UnsafeColumnarBuildSideRelation.scala index fbc329f3606..de783de4892 100644 --- a/backends-velox/src/main/scala/org/apache/spark/sql/execution/unsafe/UnsafeColumnarBuildSideRelation.scala +++ b/backends-velox/src/main/scala/org/apache/spark/sql/execution/unsafe/UnsafeColumnarBuildSideRelation.scala @@ -204,7 +204,7 @@ class UnsafeColumnarBuildSideRelation( (hashTableData, this) } else { - (HashJoinBuilder.cloneHashTable(hashTableData), null) + (HashJoinBuilder.cloneHashTable(broadcastContext.buildHashTableId, hashTableData), null) } } diff --git a/cpp/velox/jni/VeloxJniWrapper.cc b/cpp/velox/jni/VeloxJniWrapper.cc index aa4d9599435..aab855a31f6 100644 --- a/cpp/velox/jni/VeloxJniWrapper.cc +++ b/cpp/velox/jni/VeloxJniWrapper.cc @@ -46,6 +46,7 @@ #include "velox/common/base/BloomFilter.h" #include "velox/common/file/FileSystems.h" #include "velox/exec/HashTable.h" +#include "velox/exec/HashTableCache.h" #ifdef GLUTEN_ENABLE_GPU #include "cudf/CudfPlanValidator.h" @@ -1060,6 +1061,12 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_vectorized_HashJoinBuilder_native nullptr); builder->setHashTable(std::move(mainTable)); + auto* cache = facebook::velox::exec::HashTableCache::instance(); + + if (!cache->hasTable(hashTableId)) { + cache->injectTable(hashTableId, builder->hashTable(), builder->joinHasNullKeys(), defaultLeafVeloxMemoryPool()); + } + return gluten::getHashTableObjStore()->save(builder); } @@ -1138,6 +1145,16 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_vectorized_HashJoinBuilder_native } hashTableBuilders[0]->setHashTable(std::move(mainTable)); + + auto* cache = facebook::velox::exec::HashTableCache::instance(); + if (!cache->hasTable(hashTableId)) { + cache->injectTable( + hashTableId, + hashTableBuilders[0]->hashTable(), + hashTableBuilders[0]->joinHasNullKeys(), + defaultLeafVeloxMemoryPool()); + } + return gluten::getHashTableObjStore()->save(hashTableBuilders[0]); JNI_METHOD_END(kInvalidObjectHandle) } @@ -1145,9 +1162,17 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_vectorized_HashJoinBuilder_native JNIEXPORT jlong JNICALL Java_org_apache_gluten_vectorized_HashJoinBuilder_cloneHashTable( // NOLINT JNIEnv* env, jclass, + jstring cacheKey, jlong tableHandler) { JNI_METHOD_START + auto cacheKeyStr = jStringToCString(env, cacheKey); auto hashTableHandler = ObjectStore::retrieve(tableHandler); + auto* cache = facebook::velox::exec::HashTableCache::instance(); + if (!cache->hasTable(cacheKeyStr)) { + cache->injectTable( + cacheKeyStr, hashTableHandler->hashTable(), hashTableHandler->joinHasNullKeys(), defaultLeafVeloxMemoryPool()); + } + return gluten::getHashTableObjStore()->save(hashTableHandler); JNI_METHOD_END(kInvalidObjectHandle) } @@ -1155,13 +1180,15 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_vectorized_HashJoinBuilder_cloneH JNIEXPORT void JNICALL Java_org_apache_gluten_vectorized_HashJoinBuilder_clearHashTable( // NOLINT JNIEnv* env, jclass, + jstring cacheKey, jlong tableHandler) { JNI_METHOD_START - auto hashTableHandler = ObjectStore::retrieve(tableHandler); - hashTableHandler->hashTable()->clear(true); + auto cacheKeyStr = jStringToCString(env, cacheKey); + facebook::velox::exec::HashTableCache::instance()->drop(cacheKeyStr); ObjectStore::release(tableHandler); JNI_METHOD_END() } + #ifdef __cplusplus } #endif diff --git a/cpp/velox/substrait/SubstraitToVeloxPlan.cc b/cpp/velox/substrait/SubstraitToVeloxPlan.cc index 5477176ce85..1465b284c2f 100644 --- a/cpp/velox/substrait/SubstraitToVeloxPlan.cc +++ b/cpp/velox/substrait/SubstraitToVeloxPlan.cc @@ -448,29 +448,11 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait:: } else if ( sJoin.has_advanced_extension() && SubstraitParser::configSetInOptimization(sJoin.advanced_extension(), "isBHJ=")) { - std::string hashTableId = sJoin.hashtableid(); - - std::shared_ptr opaqueSharedHashTable = nullptr; - bool joinHasNullKeys = false; - - try { - auto hashTableBuilder = ObjectStore::retrieve(getJoin(hashTableId)); - joinHasNullKeys = hashTableBuilder->joinHasNullKeys(); - auto originalShared = hashTableBuilder->hashTable(); - opaqueSharedHashTable = std::shared_ptr( - originalShared, reinterpret_cast(originalShared.get())); - - LOG(INFO) << "Successfully retrieved and aliased HashTable for reuse. ID: " << hashTableId; - } catch (const std::exception& e) { - LOG(WARNING) - << "Error retrieving HashTable from ObjectStore: " << e.what() - << ". Falling back to building new table. To ensure correct results, please verify that spark.gluten.velox.buildHashTableOncePerExecutor.enabled is set to false."; - opaqueSharedHashTable = nullptr; - } - + const auto& hashTableId = sJoin.hashtableid(); + const auto joinNodeId = hashTableId.empty() ? nextPlanNodeId() : hashTableId; // Create HashJoinNode node return std::make_shared( - nextPlanNodeId(), + joinNodeId, joinType, isNullAwareAntiJoin, leftKeys, @@ -479,14 +461,12 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait:: leftNode, rightNode, getJoinOutputType(leftNode, rightNode, joinType), - false, - false, - joinHasNullKeys, - opaqueSharedHashTable); + true); } else { // Create HashJoinNode node + auto joinNodeId = nextPlanNodeId(); return std::make_shared( - nextPlanNodeId(), + joinNodeId, joinType, isNullAwareAntiJoin, leftKeys, diff --git a/ep/build-velox/src/get-velox.sh b/ep/build-velox/src/get-velox.sh index 793e1b212c5..38dc1617d5d 100755 --- a/ep/build-velox/src/get-velox.sh +++ b/ep/build-velox/src/get-velox.sh @@ -17,9 +17,9 @@ set -exu CURRENT_DIR=$(cd "$(dirname "$BASH_SOURCE")"; pwd) -VELOX_REPO=https://github.com/IBM/velox.git -VELOX_BRANCH=dft-2026_05_27 -VELOX_ENHANCED_BRANCH=ibm-2026_05_27 +VELOX_REPO=https://github.com/JkSelf/velox.git +VELOX_BRANCH=dft-2026_05_27-hashtable-cache +VELOX_ENHANCED_BRANCH=ibm-2026_05_27-hashtable-cache VELOX_HOME="" RUN_SETUP_SCRIPT=ON ENABLE_ENHANCED_FEATURES=OFF diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/execution/JoinExecTransformer.scala b/gluten-substrait/src/main/scala/org/apache/gluten/execution/JoinExecTransformer.scala index 149a0ca729e..3964c12e10e 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/execution/JoinExecTransformer.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/execution/JoinExecTransformer.scala @@ -31,6 +31,8 @@ import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight, BuildSide import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.execution.{ExpandOutputPartitioningShim, ExplainUtils, SparkPlan} +import org.apache.spark.sql.execution.adaptive.BroadcastQueryStageExec +import org.apache.spark.sql.execution.exchange.ReusedExchangeExec import org.apache.spark.sql.execution.joins.{BaseJoinExec, HashedRelationBroadcastMode, HashJoin} import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.types._ @@ -105,6 +107,15 @@ trait HashJoinLikeExecTransformer extends BaseJoinExec with TransformSupport { (right, left) } + protected def canonicalBuildHashTableId(plan: SparkPlan): String = plan match { + case b: BroadcastQueryStageExec => + canonicalBuildHashTableId(b.plan) + case r: ReusedExchangeExec => + canonicalBuildHashTableId(r.child) + case other => + other.id.toString + } + def sameType(from: DataType, to: DataType): Boolean = { (from, to) match { case (ArrayType(fromElement, _), ArrayType(toElement, _)) => @@ -267,7 +278,7 @@ trait HashJoinLikeExecTransformer extends BaseJoinExec with TransformSupport { inputBuildOutput, context, operatorId, - buildPlan.id.toString + canonicalBuildHashTableId(buildPlan) ) context.registerJoinParam(operatorId, joinParams) @@ -392,7 +403,7 @@ abstract class BroadcastHashJoinExecTransformerBase( override def hashJoinType: JoinType = joinType // Unique ID for builded hash table - lazy val buildHashTableId: String = "BuiltHashTable-" + buildPlan.id + lazy val buildHashTableId: String = canonicalBuildHashTableId(buildPlan) override def genJoinParametersInternal(): (Int, Int, String) = { (1, if (isNullAwareAntiJoin) 1 else 0, buildHashTableId) diff --git a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala index 6ccddac0c62..6f1272b7f77 100644 --- a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala +++ b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala @@ -522,7 +522,9 @@ class VeloxAdaptiveQueryExecSuite extends AdaptiveQueryExecSuite with GlutenSQLT withSQLConf( SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "20000000", - SQLConf.SUBQUERY_REUSE_ENABLED.key -> "false") { + SQLConf.ADAPTIVE_OPTIMIZER_EXCLUDED_RULES.key -> AQEPropagateEmptyRelation.ruleName, + SQLConf.SUBQUERY_REUSE_ENABLED.key -> "false" + ) { val (plan, adaptivePlan) = runAdaptiveAndVerifyResult( "SELECT a FROM testData join testData2 ON key = a " + "where value >= (" +