Skip to content

Commit 1ff7db2

Browse files
committed
Use Velox's HashTableCache to cache the BHJ's HashTable
1 parent 3a13aef commit 1ff7db2

10 files changed

Lines changed: 60 additions & 38 deletions

File tree

backends-velox/src/main/java/org/apache/gluten/vectorized/HashJoinBuilder.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,9 @@ public long rtHandle() {
3535
return runtime.getHandle();
3636
}
3737

38-
public static native void clearHashTable(long hashTableData);
38+
public static native void clearHashTable(String cacheKey, long hashTableData);
3939

40-
public static native long cloneHashTable(long hashTableData);
40+
public static native long cloneHashTable(String cacheKey, long hashTableData);
4141

4242
public native long nativeBuild(
4343
String buildHashTableId,

backends-velox/src/main/scala/org/apache/gluten/execution/HashJoinExecTransformer.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ case class BroadcastHashJoinExecTransformer(
106106
isNullAwareAntiJoin) {
107107

108108
// Unique ID for built table
109-
lazy val buildBroadcastTableId: String = buildPlan.id.toString
109+
lazy val buildBroadcastTableId: String = canonicalBuildHashTableId(buildPlan)
110110

111111
override protected lazy val substraitJoinType: JoinRel.JoinType = joinType match {
112112
case _: InnerLike =>

backends-velox/src/main/scala/org/apache/gluten/execution/VeloxBroadcastBuildSideCache.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ object VeloxBroadcastBuildSideCache
103103
}
104104
}
105105

106-
HashJoinBuilder.clearHashTable(value.pointer)
106+
HashJoinBuilder.clearHashTable(key, value.pointer)
107107
}
108108
}
109109
}

backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarBuildSideRelation.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -234,7 +234,7 @@ case class ColumnarBuildSideRelation(
234234

235235
(hashTableData, this)
236236
} else {
237-
(HashJoinBuilder.cloneHashTable(hashTableData), null)
237+
(HashJoinBuilder.cloneHashTable(broadcastContext.buildHashTableId, hashTableData), null)
238238
}
239239
}
240240

backends-velox/src/main/scala/org/apache/spark/sql/execution/unsafe/UnsafeColumnarBuildSideRelation.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,7 @@ class UnsafeColumnarBuildSideRelation(
204204

205205
(hashTableData, this)
206206
} else {
207-
(HashJoinBuilder.cloneHashTable(hashTableData), null)
207+
(HashJoinBuilder.cloneHashTable(broadcastContext.buildHashTableId, hashTableData), null)
208208
}
209209
}
210210

cpp/velox/jni/VeloxJniWrapper.cc

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
#include "velox/common/base/BloomFilter.h"
4747
#include "velox/common/file/FileSystems.h"
4848
#include "velox/exec/HashTable.h"
49+
#include "velox/exec/HashTableCache.h"
4950

5051
#ifdef GLUTEN_ENABLE_GPU
5152
#include "cudf/CudfPlanValidator.h"
@@ -1060,6 +1061,12 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_vectorized_HashJoinBuilder_native
10601061
nullptr);
10611062
builder->setHashTable(std::move(mainTable));
10621063

1064+
auto* cache = facebook::velox::exec::HashTableCache::instance();
1065+
1066+
if (!cache->hasTable(hashTableId)) {
1067+
cache->add(hashTableId, builder->hashTable(), builder->joinHasNullKeys(), defaultLeafVeloxMemoryPool());
1068+
}
1069+
10631070
return gluten::getHashTableObjStore()->save(builder);
10641071
}
10651072

@@ -1138,30 +1145,50 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_vectorized_HashJoinBuilder_native
11381145
}
11391146

11401147
hashTableBuilders[0]->setHashTable(std::move(mainTable));
1148+
1149+
auto* cache = facebook::velox::exec::HashTableCache::instance();
1150+
if (!cache->hasTable(hashTableId)) {
1151+
cache->add(
1152+
hashTableId,
1153+
hashTableBuilders[0]->hashTable(),
1154+
hashTableBuilders[0]->joinHasNullKeys(),
1155+
defaultLeafVeloxMemoryPool());
1156+
}
1157+
11411158
return gluten::getHashTableObjStore()->save(hashTableBuilders[0]);
11421159
JNI_METHOD_END(kInvalidObjectHandle)
11431160
}
11441161

11451162
JNIEXPORT jlong JNICALL Java_org_apache_gluten_vectorized_HashJoinBuilder_cloneHashTable( // NOLINT
11461163
JNIEnv* env,
11471164
jclass,
1165+
jstring cacheKey,
11481166
jlong tableHandler) {
11491167
JNI_METHOD_START
1168+
auto cacheKeyStr = jStringToCString(env, cacheKey);
11501169
auto hashTableHandler = ObjectStore::retrieve<gluten::HashTableBuilder>(tableHandler);
1170+
auto* cache = facebook::velox::exec::HashTableCache::instance();
1171+
if (!cache->hasTable(cacheKeyStr)) {
1172+
cache->add(
1173+
cacheKeyStr, hashTableHandler->hashTable(), hashTableHandler->joinHasNullKeys(), defaultLeafVeloxMemoryPool());
1174+
}
1175+
11511176
return gluten::getHashTableObjStore()->save(hashTableHandler);
11521177
JNI_METHOD_END(kInvalidObjectHandle)
11531178
}
11541179

11551180
JNIEXPORT void JNICALL Java_org_apache_gluten_vectorized_HashJoinBuilder_clearHashTable( // NOLINT
11561181
JNIEnv* env,
11571182
jclass,
1183+
jstring cacheKey,
11581184
jlong tableHandler) {
11591185
JNI_METHOD_START
1160-
auto hashTableHandler = ObjectStore::retrieve<gluten::HashTableBuilder>(tableHandler);
1161-
hashTableHandler->hashTable()->clear(true);
1186+
auto cacheKeyStr = jStringToCString(env, cacheKey);
1187+
facebook::velox::exec::HashTableCache::instance()->drop(cacheKeyStr);
11621188
ObjectStore::release(tableHandler);
11631189
JNI_METHOD_END()
11641190
}
1191+
11651192
#ifdef __cplusplus
11661193
}
11671194
#endif

cpp/velox/substrait/SubstraitToVeloxPlan.cc

Lines changed: 6 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -457,26 +457,8 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::
457457
} else if (
458458
sJoin.has_advanced_extension() &&
459459
SubstraitParser::configSetInOptimization(sJoin.advanced_extension(), "isBHJ=")) {
460-
std::string hashTableId = sJoin.hashtableid();
461-
462-
std::shared_ptr<core::OpaqueHashTable> opaqueSharedHashTable = nullptr;
463-
bool joinHasNullKeys = false;
464-
465-
try {
466-
auto hashTableBuilder = ObjectStore::retrieve<gluten::HashTableBuilder>(getJoin(hashTableId));
467-
joinHasNullKeys = hashTableBuilder->joinHasNullKeys();
468-
auto originalShared = hashTableBuilder->hashTable();
469-
opaqueSharedHashTable = std::shared_ptr<core::OpaqueHashTable>(
470-
originalShared, reinterpret_cast<core::OpaqueHashTable*>(originalShared.get()));
471-
472-
LOG(INFO) << "Successfully retrieved and aliased HashTable for reuse. ID: " << hashTableId;
473-
} catch (const std::exception& e) {
474-
LOG(WARNING)
475-
<< "Error retrieving HashTable from ObjectStore: " << e.what()
476-
<< ". Falling back to building new table. To ensure correct results, please verify that spark.gluten.velox.buildHashTableOncePerExecutor.enabled is set to false.";
477-
opaqueSharedHashTable = nullptr;
478-
}
479-
460+
const auto& hashTableId = sJoin.hashtableid();
461+
const auto joinNodeId = hashTableId.empty() ? nextPlanNodeId() : hashTableId;
480462
// Create HashJoinNode node
481463
return std::make_shared<core::HashJoinNode>(
482464
nextPlanNodeId(),
@@ -488,14 +470,14 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::
488470
leftNode,
489471
rightNode,
490472
getJoinOutputType(leftNode, rightNode, joinType),
473+
true,
491474
false,
492-
false,
493-
joinHasNullKeys,
494-
opaqueSharedHashTable);
475+
sJoin.hashtableid());
495476
} else {
496477
// Create HashJoinNode node
478+
auto joinNodeId = nextPlanNodeId();
497479
return std::make_shared<core::HashJoinNode>(
498-
nextPlanNodeId(),
480+
joinNodeId,
499481
joinType,
500482
isNullAwareAntiJoin,
501483
leftKeys,

ep/build-velox/src/get-velox.sh

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,9 @@
1717
set -exu
1818

1919
CURRENT_DIR=$(cd "$(dirname "$BASH_SOURCE")"; pwd)
20-
VELOX_REPO=https://github.com/IBM/velox.git
21-
VELOX_BRANCH=dft-2026_06_12
22-
VELOX_ENHANCED_BRANCH=ibm-2026_06_12
20+
VELOX_REPO=https://github.com/JkSelf/velox.git
21+
VELOX_BRANCH=dft-2026_06_12-hashtable-cache
22+
VELOX_ENHANCED_BRANCH=ibm-2026_06_12-hashtable-cache
2323
VELOX_HOME=""
2424
RUN_SETUP_SCRIPT=ON
2525
ENABLE_ENHANCED_FEATURES=OFF

gluten-substrait/src/main/scala/org/apache/gluten/execution/JoinExecTransformer.scala

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@ import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight, BuildSide
3131
import org.apache.spark.sql.catalyst.plans._
3232
import org.apache.spark.sql.catalyst.plans.physical._
3333
import org.apache.spark.sql.execution.{ExpandOutputPartitioningShim, ExplainUtils, SparkPlan}
34+
import org.apache.spark.sql.execution.adaptive.BroadcastQueryStageExec
35+
import org.apache.spark.sql.execution.exchange.ReusedExchangeExec
3436
import org.apache.spark.sql.execution.joins.{BaseJoinExec, HashedRelationBroadcastMode, HashJoin}
3537
import org.apache.spark.sql.execution.metric.SQLMetric
3638
import org.apache.spark.sql.types._
@@ -105,6 +107,15 @@ trait HashJoinLikeExecTransformer extends BaseJoinExec with TransformSupport {
105107
(right, left)
106108
}
107109

110+
protected def canonicalBuildHashTableId(plan: SparkPlan): String = plan match {
111+
case b: BroadcastQueryStageExec =>
112+
canonicalBuildHashTableId(b.plan)
113+
case r: ReusedExchangeExec =>
114+
canonicalBuildHashTableId(r.child)
115+
case other =>
116+
other.id.toString
117+
}
118+
108119
def sameType(from: DataType, to: DataType): Boolean = {
109120
(from, to) match {
110121
case (ArrayType(fromElement, _), ArrayType(toElement, _)) =>
@@ -267,7 +278,7 @@ trait HashJoinLikeExecTransformer extends BaseJoinExec with TransformSupport {
267278
inputBuildOutput,
268279
context,
269280
operatorId,
270-
buildPlan.id.toString
281+
canonicalBuildHashTableId(buildPlan)
271282
)
272283

273284
context.registerJoinParam(operatorId, joinParams)
@@ -392,7 +403,7 @@ abstract class BroadcastHashJoinExecTransformerBase(
392403
override def hashJoinType: JoinType = joinType
393404

394405
// Unique ID for builded hash table
395-
lazy val buildHashTableId: String = "BuiltHashTable-" + buildPlan.id
406+
lazy val buildHashTableId: String = canonicalBuildHashTableId(buildPlan)
396407

397408
override def genJoinParametersInternal(): (Int, Int, String) = {
398409
(1, if (isNullAwareAntiJoin) 1 else 0, buildHashTableId)

gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -522,7 +522,9 @@ class VeloxAdaptiveQueryExecSuite extends AdaptiveQueryExecSuite with GlutenSQLT
522522
withSQLConf(
523523
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
524524
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "20000000",
525-
SQLConf.SUBQUERY_REUSE_ENABLED.key -> "false") {
525+
SQLConf.ADAPTIVE_OPTIMIZER_EXCLUDED_RULES.key -> AQEPropagateEmptyRelation.ruleName,
526+
SQLConf.SUBQUERY_REUSE_ENABLED.key -> "false"
527+
) {
526528
val (plan, adaptivePlan) = runAdaptiveAndVerifyResult(
527529
"SELECT a FROM testData join testData2 ON key = a " +
528530
"where value >= (" +

0 commit comments

Comments
 (0)