[VL][TEST]Use Velox's HashTableCache to cache the BHJ's HashTable#12163
[VL][TEST]Use Velox's HashTableCache to cache the BHJ's HashTable#12163JkSelf wants to merge 6 commits into
Conversation
|
Run Gluten Clickhouse CI on x86 |
63a6382 to
10aec74
Compare
|
Run Gluten Clickhouse CI on x86 |
There was a problem hiding this comment.
Pull request overview
This PR wires Velox’s HashTableCache into Gluten’s Velox broadcast hash join (BHJ) path by stabilizing the build-side hash table identifier across AQE wrappers and propagating Spark execution metadata down into the native runtime to support cache scoping/reuse.
Changes:
- Canonicalize BHJ build hash table IDs across
BroadcastQueryStageExec/ReusedExchangeExecso reuse paths share a stable cache key. - Add Spark SQL execution id propagation (Java → JNI → native) and use it in Velox task/query identifiers.
- Integrate Velox
HashTableCacheinjection/drop in native BHJ build/cleanup, and update Velox-side build relation/cache call sites accordingly.
Reviewed changes
Copilot reviewed 15 out of 15 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
| gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala | Adjusts AQE test conf to avoid an optimizer rule impacting reuse scenarios. |
| gluten-substrait/src/main/scala/org/apache/gluten/execution/JoinExecTransformer.scala | Introduces canonical build hash table ID derivation across AQE wrappers and uses it in join parameters. |
| gluten-arrow/src/main/java/org/apache/gluten/vectorized/PlanEvaluatorJniWrapper.java | Extends JNI kernel creation signature to include Spark execution id. |
| gluten-arrow/src/main/java/org/apache/gluten/vectorized/NativePlanEvaluator.java | Extracts Spark execution id from task local properties and passes it to JNI. |
| ep/build-velox/src/get-velox.sh | Changes default Velox repo/branch selection for builds. |
| cpp/velox/substrait/SubstraitToVeloxPlan.cc | Updates BHJ plan construction to align with Velox-side hash table caching behavior. |
| cpp/velox/jni/VeloxJniWrapper.cc | Injects built hash tables into Velox HashTableCache and updates clone/clear JNI APIs to use cache keys. |
| cpp/velox/compute/WholeStageResultIterator.cc | Uses Spark execution id for Velox task/query identification. |
| cpp/core/jni/JniWrapper.cc | Propagates execution id into native SparkTaskInfo. |
| cpp/core/compute/Runtime.h | Extends SparkTaskInfo with execution id and updates formatting. |
| backends-velox/src/main/scala/org/apache/spark/sql/execution/unsafe/UnsafeColumnarBuildSideRelation.scala | Updates hash table “clone” call to pass cache key. |
| backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarBuildSideRelation.scala | Updates hash table “clone” call to pass cache key. |
| backends-velox/src/main/scala/org/apache/gluten/execution/VeloxBroadcastBuildSideCache.scala | Updates hash table clear to drop by cache key. |
| backends-velox/src/main/scala/org/apache/gluten/execution/HashJoinExecTransformer.scala | Uses canonical build hash table ID for broadcast-table resource tracking and reuse. |
| backends-velox/src/main/java/org/apache/gluten/vectorized/HashJoinBuilder.java | Updates native API signatures to include cache key for clone/clear. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| @@ -111,11 +124,7 @@ WholeStageResultIterator::WholeStageResultIterator( | |||
| velox::core::PlanFragment planFragment{planNode, velox::core::ExecutionStrategy::kUngrouped, 1, emptySet}; | |||
| std::shared_ptr<velox::core::QueryCtx> queryCtx = createNewVeloxQueryCtx(); | |||
| task_ = velox::exec::Task::create( | |||
| fmt::format( | |||
| "Gluten_Stage_{}_TID_{}_VTID_{}", | |||
| std::to_string(taskInfo_.stageId), | |||
| std::to_string(taskInfo_.taskId), | |||
| std::to_string(taskInfo.vId)), | |||
| getVeloxTaskId(taskInfo_), | |||
| std::move(planFragment), | |||
| 0, | |||
| CURRENT_DIR=$(cd "$(dirname "$BASH_SOURCE")"; pwd) | ||
| VELOX_REPO=https://github.com/IBM/velox.git | ||
| VELOX_BRANCH=dft-2026_06_04 | ||
| VELOX_ENHANCED_BRANCH=ibm-2026_06_04 | ||
| VELOX_REPO=https://github.com/JkSelf/velox.git | ||
| VELOX_BRANCH=dft-2026_06_04-hashtable-cache | ||
| VELOX_ENHANCED_BRANCH=ibm-2026_06_04-hashtable-cache |
10aec74 to
581f7e5
Compare
|
Run Gluten Clickhouse CI on x86 |
581f7e5 to
824da36
Compare
|
Run Gluten Clickhouse CI on x86 |
| VELOX_REPO=https://github.com/JkSelf/velox.git | ||
| VELOX_BRANCH=dft-2026_06_05-hashtable-cache | ||
| VELOX_ENHANCED_BRANCH=ibm-2026_06_05-hashtable-cache |
| std::string getVeloxTaskId(const SparkTaskInfo& taskInfo) { | ||
| if (taskInfo.executionId != -1) { | ||
| return fmt::format("Gluten_Execution_{}", std::to_string(taskInfo.executionId)); | ||
| } | ||
| return fmt::format( | ||
| "Gluten_Stage_{}_TID_{}_VTID_{}", | ||
| std::to_string(taskInfo.stageId), | ||
| std::to_string(taskInfo.taskId), | ||
| std::to_string(taskInfo.vId)); | ||
| } |
| const auto& hashTableId = sJoin.hashtableid(); | ||
| const auto joinNodeId = hashTableId.empty() ? nextPlanNodeId() : hashTableId; | ||
| // Create HashJoinNode node | ||
| return std::make_shared<core::HashJoinNode>( | ||
| nextPlanNodeId(), | ||
| joinNodeId, |
| // Unique ID for builded hash table | ||
| lazy val buildHashTableId: String = "BuiltHashTable-" + buildPlan.id | ||
| lazy val buildHashTableId: String = canonicalBuildHashTableId(buildPlan) | ||
|
|
| // Unique ID for built table | ||
| lazy val buildBroadcastTableId: String = buildPlan.id.toString | ||
| lazy val buildBroadcastTableId: String = canonicalBuildHashTableId(buildPlan) | ||
|
|
824da36 to
2f34754
Compare
|
Run Gluten Clickhouse CI on x86 |
2f34754 to
677658b
Compare
|
Run Gluten Clickhouse CI on x86 |
| std::string getVeloxTaskId(const SparkTaskInfo& taskInfo) { | ||
| if (taskInfo.executionId != -1) { | ||
| return fmt::format("Gluten_Execution_{}", std::to_string(taskInfo.executionId)); | ||
| } | ||
| return fmt::format("Gluten_Execution_{}", ""); | ||
| } |
| val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY) | ||
| var cacheKey = "" | ||
| if (executionId != null) { | ||
| cacheKey = "Gluten_Execution_" + executionId + ":" + buildHashTableId | ||
| GlutenDriverEndpoint.collectResources(executionId, buildBroadcastTableId) |
| VELOX_REPO=https://github.com/JkSelf/velox.git | ||
| VELOX_BRANCH=dft-2026_06_05-hashtable-cache | ||
| VELOX_ENHANCED_BRANCH=ibm-2026_06_05-hashtable-cache |
|
Run Gluten Clickhouse CI on x86 |
|
Run Gluten Clickhouse CI on x86 |
| std::string getVeloxTaskId(const SparkTaskInfo& taskInfo) { | ||
| if (taskInfo.executionId != -1) { | ||
| return fmt::format("Gluten_Execution_{}", std::to_string(taskInfo.executionId)); | ||
| } | ||
| return fmt::format("Gluten_Execution_{}", ""); | ||
| } |
|
|
||
| size_t pos = hashTableId.find(':'); | ||
| auto planNodeId = hashTableId.substr(pos + 1); | ||
| std::cout << "the cacheKey is " << hashTableId << " the planNode id is " << planNodeId << std::endl; |
| size_t pos = hashTableId.find(':'); | ||
| auto planNodeId = hashTableId.substr(pos + 1); | ||
| std::cout << "the cacheKey is " << hashTableId << " the planNode id is " << planNodeId << std::endl; | ||
| const auto joinNodeId = hashTableId.empty() ? nextPlanNodeId() : planNodeId; | ||
|
|
| const auto abandonHashBuildDedupMinPct = | ||
| queryConf.get<uint32_t>(kAbandonDedupHashMapMinPct, kAbandonDedupHashMapMinPctDefault); | ||
| const auto hashTableId = jStringToCString(env, tableId); | ||
| std::cout << "the cacheKey in nativeBuild is " << hashTableId << std::endl; |
| VELOX_REPO=https://github.com/JkSelf/velox.git | ||
| VELOX_BRANCH=dft-2026_06_05-hashtable-cache | ||
| VELOX_ENHANCED_BRANCH=ibm-2026_06_05-hashtable-cache |
| enableSuite[GlutenTableScanSuite] | ||
| .includeByPrefix("Caching") |
|
Run Gluten Clickhouse CI on x86 |
96e888a to
67b5711
Compare
d2cb709 to
05d541c
Compare
|
Run Gluten Clickhouse CI on x86 |
| protected def canonicalBuildHashTableId(plan: SparkPlan): String = plan match { | ||
| case b: BroadcastQueryStageExec => | ||
| canonicalBuildHashTableId(b.plan) | ||
| case r: ReusedExchangeExec => | ||
| canonicalBuildHashTableId(r.child) | ||
| case other => | ||
| other.id.toString | ||
| } |
| VELOX_REPO=https://github.com/JkSelf/velox.git | ||
| VELOX_BRANCH=dft-2026_06_12-hashtable-cache | ||
| VELOX_ENHANCED_BRANCH=ibm-2026_06_12-hashtable-cache |
05d541c to
5c65ed6
Compare
|
Run Gluten Clickhouse CI on x86 |
5c65ed6 to
1ff7db2
Compare
|
Run Gluten Clickhouse CI on x86 |
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 10 out of 10 changed files in this pull request and generated 1 comment.
Comments suppressed due to low confidence (1)
cpp/velox/substrait/SubstraitToVeloxPlan.cc:465
- In the BHJ branch,
joinNodeIdis computed but never used, and callingnextPlanNodeId()twice can skip an ID and leave an unused local variable. This can also fail builds if-Werror=unused-variableis enabled.
Consider removing the unused joinNodeId and reuse hashTableId for the ctor argument that needs it.
const auto& hashTableId = sJoin.hashtableid();
const auto joinNodeId = hashTableId.empty() ? nextPlanNodeId() : hashTableId;
// Create HashJoinNode node
return std::make_shared<core::HashJoinNode>(
nextPlanNodeId(),
joinType,
| VELOX_REPO=https://github.com/JkSelf/velox.git | ||
| VELOX_BRANCH=dft-2026_06_12-hashtable-cache | ||
| VELOX_ENHANCED_BRANCH=ibm-2026_06_12-hashtable-cache |
|
Run Gluten Clickhouse CI on x86 |
de6a0e4 to
01676f5
Compare
|
Run Gluten Clickhouse CI on x86 |
| const auto& hashTableId = sJoin.hashtableid(); | ||
| const auto joinNodeId = hashTableId.empty() ? nextPlanNodeId() : hashTableId; | ||
| // Create HashJoinNode node | ||
| return std::make_shared<core::HashJoinNode>( | ||
| nextPlanNodeId(), |
| auto* cache = facebook::velox::exec::HashTableCache::instance(); | ||
|
|
||
| if (!cache->exist(hashTableId)) { | ||
| cache->add(hashTableId, builder->hashTable(), builder->joinHasNullKeys(), defaultLeafVeloxMemoryPool()); | ||
| } |
| override def hashJoinType: JoinType = joinType | ||
|
|
||
| lazy val buildHashTableId: String = buildPlan.id.toString | ||
| // Unique ID for builded hash table |
|
Run Gluten Clickhouse CI on x86 |
|
Run Gluten Clickhouse CI on x86 |
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 9 out of 9 changed files in this pull request and generated 3 comments.
Comments suppressed due to low confidence (1)
backends-velox/src/main/java/org/apache/gluten/vectorized/HashJoinBuilder.java:44
- The PR title is tagged as
[TEST], but this change set includes production code changes across Scala transformers and JNI/C++ Velox integration (not just tests). Consider updating the title/tag to reflect that this is a functional Velox backend change.
public static native void clearHashTable(String cacheKey, long hashTableData);
public static native long cloneHashTable(String cacheKey, long hashTableData);
public native long nativeBuild(
String buildHashTableId,
long[] batchHandlers,
| const auto& hashTableId = sJoin.hashtableid(); | ||
| const auto joinNodeId = hashTableId.empty() ? nextPlanNodeId() : hashTableId; | ||
| // Create HashJoinNode node | ||
| return std::make_shared<core::HashJoinNode>( | ||
| nextPlanNodeId(), |
| // Unique ID for builded hash table | ||
| lazy val buildHashTableId: String = canonicalBuildHashTableId(buildPlan) |
| override def joinBuildSide: BuildSide = buildSide | ||
| override def hashJoinType: JoinType = joinType | ||
|
|
||
| lazy val buildHashTableId: String = buildPlan.id.toString | ||
| // Unique ID for builded hash table | ||
| lazy val buildHashTableId: String = canonicalBuildHashTableId(buildPlan) | ||
|
|
||
| override def genJoinParametersInternal(): (Int, Int, String) = { | ||
| (1, if (isNullAwareAntiJoin) 1 else 0, buildHashTableId) |
What changes are proposed in this pull request?
How was this patch tested?
Was this patch authored or co-authored using generative AI tooling?