Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ public long rtHandle() {
return runtime.getHandle();
}

public static native void clearHashTable(long hashTableData);
public static native void clearHashTable(String cacheKey, long hashTableData);

public static native long cloneHashTable(long hashTableData);
public static native long cloneHashTable(String cacheKey, long hashTableData);

public native long nativeBuild(
String buildHashTableId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ case class BroadcastHashJoinExecTransformer(
isNullAwareAntiJoin) {

// Unique ID for built table
lazy val buildBroadcastTableId: String = buildPlan.id.toString
lazy val buildBroadcastTableId: String = canonicalBuildHashTableId(buildPlan)

override protected lazy val substraitJoinType: JoinRel.JoinType = joinType match {
case _: InnerLike =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ object VeloxBroadcastBuildSideCache
}
}

HashJoinBuilder.clearHashTable(value.pointer)
HashJoinBuilder.clearHashTable(key, value.pointer)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ case class ColumnarBuildSideRelation(

(hashTableData, this)
} else {
(HashJoinBuilder.cloneHashTable(hashTableData), null)
(HashJoinBuilder.cloneHashTable(broadcastContext.buildHashTableId, hashTableData), null)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ class UnsafeColumnarBuildSideRelation(

(hashTableData, this)
} else {
(HashJoinBuilder.cloneHashTable(hashTableData), null)
(HashJoinBuilder.cloneHashTable(broadcastContext.buildHashTableId, hashTableData), null)
}
}

Expand Down
31 changes: 29 additions & 2 deletions cpp/velox/jni/VeloxJniWrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
#include "velox/common/base/BloomFilter.h"
#include "velox/common/file/FileSystems.h"
#include "velox/exec/HashTable.h"
#include "velox/exec/HashTableCache.h"

#ifdef GLUTEN_ENABLE_GPU
#include "cudf/CudfPlanValidator.h"
Expand Down Expand Up @@ -1060,6 +1061,12 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_vectorized_HashJoinBuilder_native
nullptr);
builder->setHashTable(std::move(mainTable));

auto* cache = facebook::velox::exec::HashTableCache::instance();

if (!cache->hasTable(hashTableId)) {
cache->injectTable(hashTableId, builder->hashTable(), builder->joinHasNullKeys(), defaultLeafVeloxMemoryPool());
}

return gluten::getHashTableObjStore()->save(builder);
}

Expand Down Expand Up @@ -1138,30 +1145,50 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_vectorized_HashJoinBuilder_native
}

hashTableBuilders[0]->setHashTable(std::move(mainTable));

auto* cache = facebook::velox::exec::HashTableCache::instance();
if (!cache->hasTable(hashTableId)) {
cache->injectTable(
hashTableId,
hashTableBuilders[0]->hashTable(),
hashTableBuilders[0]->joinHasNullKeys(),
defaultLeafVeloxMemoryPool());
}

return gluten::getHashTableObjStore()->save(hashTableBuilders[0]);
JNI_METHOD_END(kInvalidObjectHandle)
}

JNIEXPORT jlong JNICALL Java_org_apache_gluten_vectorized_HashJoinBuilder_cloneHashTable( // NOLINT
JNIEnv* env,
jclass,
jstring cacheKey,
jlong tableHandler) {
JNI_METHOD_START
auto cacheKeyStr = jStringToCString(env, cacheKey);
auto hashTableHandler = ObjectStore::retrieve<gluten::HashTableBuilder>(tableHandler);
auto* cache = facebook::velox::exec::HashTableCache::instance();
if (!cache->hasTable(cacheKeyStr)) {
cache->injectTable(
cacheKeyStr, hashTableHandler->hashTable(), hashTableHandler->joinHasNullKeys(), defaultLeafVeloxMemoryPool());
}

return gluten::getHashTableObjStore()->save(hashTableHandler);
JNI_METHOD_END(kInvalidObjectHandle)
}

JNIEXPORT void JNICALL Java_org_apache_gluten_vectorized_HashJoinBuilder_clearHashTable( // NOLINT
JNIEnv* env,
jclass,
jstring cacheKey,
jlong tableHandler) {
JNI_METHOD_START
auto hashTableHandler = ObjectStore::retrieve<gluten::HashTableBuilder>(tableHandler);
hashTableHandler->hashTable()->clear(true);
auto cacheKeyStr = jStringToCString(env, cacheKey);
facebook::velox::exec::HashTableCache::instance()->drop(cacheKeyStr);
ObjectStore::release(tableHandler);
JNI_METHOD_END()
}

#ifdef __cplusplus
}
#endif
32 changes: 6 additions & 26 deletions cpp/velox/substrait/SubstraitToVeloxPlan.cc
Original file line number Diff line number Diff line change
Expand Up @@ -448,29 +448,11 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::
} else if (
sJoin.has_advanced_extension() &&
SubstraitParser::configSetInOptimization(sJoin.advanced_extension(), "isBHJ=")) {
std::string hashTableId = sJoin.hashtableid();

std::shared_ptr<core::OpaqueHashTable> opaqueSharedHashTable = nullptr;
bool joinHasNullKeys = false;

try {
auto hashTableBuilder = ObjectStore::retrieve<gluten::HashTableBuilder>(getJoin(hashTableId));
joinHasNullKeys = hashTableBuilder->joinHasNullKeys();
auto originalShared = hashTableBuilder->hashTable();
opaqueSharedHashTable = std::shared_ptr<core::OpaqueHashTable>(
originalShared, reinterpret_cast<core::OpaqueHashTable*>(originalShared.get()));

LOG(INFO) << "Successfully retrieved and aliased HashTable for reuse. ID: " << hashTableId;
} catch (const std::exception& e) {
LOG(WARNING)
<< "Error retrieving HashTable from ObjectStore: " << e.what()
<< ". Falling back to building new table. To ensure correct results, please verify that spark.gluten.velox.buildHashTableOncePerExecutor.enabled is set to false.";
opaqueSharedHashTable = nullptr;
}

const auto& hashTableId = sJoin.hashtableid();
const auto joinNodeId = hashTableId.empty() ? nextPlanNodeId() : hashTableId;
// Create HashJoinNode node
return std::make_shared<core::HashJoinNode>(
nextPlanNodeId(),
joinNodeId,
joinType,
isNullAwareAntiJoin,
leftKeys,
Expand All @@ -479,14 +461,12 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::
leftNode,
rightNode,
getJoinOutputType(leftNode, rightNode, joinType),
false,
false,
joinHasNullKeys,
opaqueSharedHashTable);
true);
} else {
// Create HashJoinNode node
auto joinNodeId = nextPlanNodeId();
return std::make_shared<core::HashJoinNode>(
nextPlanNodeId(),
joinNodeId,
joinType,
isNullAwareAntiJoin,
leftKeys,
Expand Down
6 changes: 3 additions & 3 deletions ep/build-velox/src/get-velox.sh
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@
set -exu

CURRENT_DIR=$(cd "$(dirname "$BASH_SOURCE")"; pwd)
VELOX_REPO=https://github.com/IBM/velox.git
VELOX_BRANCH=dft-2026_05_27
VELOX_ENHANCED_BRANCH=ibm-2026_05_27
VELOX_REPO=https://github.com/JkSelf/velox.git
VELOX_BRANCH=dft-2026_05_27-hashtable-cache
VELOX_ENHANCED_BRANCH=ibm-2026_05_27-hashtable-cache
VELOX_HOME=""
RUN_SETUP_SCRIPT=ON
ENABLE_ENHANCED_FEATURES=OFF
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight, BuildSide
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.execution.{ExpandOutputPartitioningShim, ExplainUtils, SparkPlan}
import org.apache.spark.sql.execution.adaptive.BroadcastQueryStageExec
import org.apache.spark.sql.execution.exchange.ReusedExchangeExec
import org.apache.spark.sql.execution.joins.{BaseJoinExec, HashedRelationBroadcastMode, HashJoin}
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.types._
Expand Down Expand Up @@ -105,6 +107,15 @@ trait HashJoinLikeExecTransformer extends BaseJoinExec with TransformSupport {
(right, left)
}

protected def canonicalBuildHashTableId(plan: SparkPlan): String = plan match {
case b: BroadcastQueryStageExec =>
canonicalBuildHashTableId(b.plan)
case r: ReusedExchangeExec =>
canonicalBuildHashTableId(r.child)
case other =>
other.id.toString
}

def sameType(from: DataType, to: DataType): Boolean = {
(from, to) match {
case (ArrayType(fromElement, _), ArrayType(toElement, _)) =>
Expand Down Expand Up @@ -267,7 +278,7 @@ trait HashJoinLikeExecTransformer extends BaseJoinExec with TransformSupport {
inputBuildOutput,
context,
operatorId,
buildPlan.id.toString
canonicalBuildHashTableId(buildPlan)
)

context.registerJoinParam(operatorId, joinParams)
Expand Down Expand Up @@ -392,7 +403,7 @@ abstract class BroadcastHashJoinExecTransformerBase(
override def hashJoinType: JoinType = joinType

// Unique ID for builded hash table
lazy val buildHashTableId: String = "BuiltHashTable-" + buildPlan.id
lazy val buildHashTableId: String = canonicalBuildHashTableId(buildPlan)

override def genJoinParametersInternal(): (Int, Int, String) = {
(1, if (isNullAwareAntiJoin) 1 else 0, buildHashTableId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -522,7 +522,9 @@ class VeloxAdaptiveQueryExecSuite extends AdaptiveQueryExecSuite with GlutenSQLT
withSQLConf(
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "20000000",
SQLConf.SUBQUERY_REUSE_ENABLED.key -> "false") {
SQLConf.ADAPTIVE_OPTIMIZER_EXCLUDED_RULES.key -> AQEPropagateEmptyRelation.ruleName,
SQLConf.SUBQUERY_REUSE_ENABLED.key -> "false"
) {
val (plan, adaptivePlan) = runAdaptiveAndVerifyResult(
"SELECT a FROM testData join testData2 ON key = a " +
"where value >= (" +
Expand Down
Loading