Skip to content

Commit 1ce1be2

Browse files
fixes
1 parent 7a81e29 commit 1ce1be2

4 files changed

Lines changed: 392 additions & 27 deletions

File tree

cpp/daal/src/algorithms/hdbscan/hdbscan_dense_batch_impl.i

Lines changed: 94 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -336,6 +336,7 @@ services::Status HDBSCANBatchKernel<algorithmFPType, method, cpu>::compute(const
336336

337337
// Build MST using Prim's algorithm with MRD weights.
338338
// O(N^2) time using dense distance matrix, O(N) extra space.
339+
// Inner loops are parallelized via static_threader_for for large N.
339340
{
340341
daal::services::internal::TArray<algorithmFPType, cpu> minEdgeArr(nRows);
341342
daal::services::internal::TArray<int, cpu> minFromArr(nRows);
@@ -366,38 +367,107 @@ services::Status HDBSCANBatchKernel<algorithmFPType, method, cpu>::compute(const
366367
minEdge[j] = d;
367368
}
368369

369-
for (size_t e = 0; e < edgeCount; e++)
370+
const bool useParallelMst = (nRows >= 2048);
371+
372+
if (useParallelMst)
370373
{
371-
// Find minimum edge to non-MST node
372-
int best = -1;
373-
algorithmFPType bestW = std::numeric_limits<algorithmFPType>::max();
374-
for (size_t j = 0; j < nRows; j++)
374+
// Parallel Prim's: use static_threader_for for inner loops.
375+
// static_threader_for assigns contiguous index ranges per thread,
376+
// preserving deterministic tie-breaking (lowest index wins).
377+
const size_t nThreads = daal::threader_get_threads_number();
378+
379+
daal::services::internal::TArray<algorithmFPType, cpu> threadBestWArr(nThreads);
380+
daal::services::internal::TArray<int, cpu> threadBestIdxArr(nThreads);
381+
algorithmFPType * threadBestW = threadBestWArr.get();
382+
int * threadBestIdx = threadBestIdxArr.get();
383+
DAAL_CHECK_MALLOC(threadBestW);
384+
DAAL_CHECK_MALLOC(threadBestIdx);
385+
386+
for (size_t e = 0; e < edgeCount; e++)
375387
{
376-
if (!inMst[j] && minEdge[j] < bestW)
388+
// Phase 1: Parallel min-find — each thread finds local minimum
389+
for (size_t t = 0; t < nThreads; t++)
377390
{
378-
bestW = minEdge[j];
379-
best = static_cast<int>(j);
391+
threadBestW[t] = std::numeric_limits<algorithmFPType>::max();
392+
threadBestIdx[t] = -1;
380393
}
381-
}
382394

383-
mstFrom[e] = minFrom[best];
384-
mstTo[e] = best;
385-
mstWeights[e] = bestW;
386-
inMst[best] = 1;
395+
daal::static_threader_for(nRows, [&](size_t j, size_t tid) {
396+
if (!inMst[j] && minEdge[j] < threadBestW[tid])
397+
{
398+
threadBestW[tid] = minEdge[j];
399+
threadBestIdx[tid] = static_cast<int>(j);
400+
}
401+
});
387402

388-
// Update min_edge for remaining nodes
389-
const algorithmFPType * bestRow = distMatrix + static_cast<size_t>(best) * nRows;
390-
const algorithmFPType coreBest = coreDistances[best];
391-
for (size_t j = 0; j < nRows; j++)
403+
// Merge thread-local results (iterate in tid order for deterministic tie-breaking)
404+
int best = -1;
405+
algorithmFPType bestW = std::numeric_limits<algorithmFPType>::max();
406+
for (size_t t = 0; t < nThreads; t++)
407+
{
408+
if (threadBestIdx[t] >= 0 && threadBestW[t] < bestW)
409+
{
410+
bestW = threadBestW[t];
411+
best = threadBestIdx[t];
412+
}
413+
}
414+
415+
mstFrom[e] = minFrom[best];
416+
mstTo[e] = best;
417+
mstWeights[e] = bestW;
418+
inMst[best] = 1;
419+
420+
// Phase 2: Parallel update of min_edge for remaining nodes
421+
const algorithmFPType * bestRow = distMatrix + static_cast<size_t>(best) * nRows;
422+
const algorithmFPType coreBest = coreDistances[best];
423+
424+
daal::static_threader_for(nRows, [&](size_t j, size_t /*tid*/) {
425+
if (inMst[j]) return;
426+
algorithmFPType d = bestRow[j];
427+
d = (coreBest > d) ? coreBest : d;
428+
d = (coreDistances[j] > d) ? coreDistances[j] : d;
429+
if (d < minEdge[j])
430+
{
431+
minEdge[j] = d;
432+
minFrom[j] = best;
433+
}
434+
});
435+
}
436+
}
437+
else
438+
{
439+
// Serial Prim's for small N (parallelization overhead not worth it)
440+
for (size_t e = 0; e < edgeCount; e++)
392441
{
393-
if (inMst[j]) continue;
394-
algorithmFPType d = bestRow[j];
395-
d = (coreBest > d) ? coreBest : d;
396-
d = (coreDistances[j] > d) ? coreDistances[j] : d;
397-
if (d < minEdge[j])
442+
int best = -1;
443+
algorithmFPType bestW = std::numeric_limits<algorithmFPType>::max();
444+
for (size_t j = 0; j < nRows; j++)
445+
{
446+
if (!inMst[j] && minEdge[j] < bestW)
447+
{
448+
bestW = minEdge[j];
449+
best = static_cast<int>(j);
450+
}
451+
}
452+
453+
mstFrom[e] = minFrom[best];
454+
mstTo[e] = best;
455+
mstWeights[e] = bestW;
456+
inMst[best] = 1;
457+
458+
const algorithmFPType * bestRow = distMatrix + static_cast<size_t>(best) * nRows;
459+
const algorithmFPType coreBest = coreDistances[best];
460+
for (size_t j = 0; j < nRows; j++)
398461
{
399-
minEdge[j] = d;
400-
minFrom[j] = best;
462+
if (inMst[j]) continue;
463+
algorithmFPType d = bestRow[j];
464+
d = (coreBest > d) ? coreBest : d;
465+
d = (coreDistances[j] > d) ? coreDistances[j] : d;
466+
if (d < minEdge[j])
467+
{
468+
minEdge[j] = d;
469+
minFrom[j] = best;
470+
}
401471
}
402472
}
403473
}

cpp/oneapi/dal/algo/hdbscan/backend/gpu/kernel_fp_impl.hpp

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -248,13 +248,14 @@ sycl::event kernels_fp<Float>::build_mst(sycl::queue& queue,
248248

249249
std::vector<Float> min_edge_vec(n, std::numeric_limits<Float>::max());
250250
std::vector<std::int32_t> min_from_vec(n, 0);
251-
std::vector<bool> in_mst_vec(n, false);
251+
// Use char instead of bool to avoid bit-packing overhead of vector<bool>
252+
std::vector<char> in_mst_vec(n, 0);
252253

253254
std::vector<std::int32_t> from_vec(edge_count);
254255
std::vector<std::int32_t> to_vec(edge_count);
255256
std::vector<Float> weight_vec(edge_count);
256257

257-
in_mst_vec[0] = true;
258+
in_mst_vec[0] = 1;
258259
for (std::int64_t j = 1; j < n; j++) {
259260
min_edge_vec[j] = mrd_ptr[j]; // row 0, column j
260261
}
@@ -273,7 +274,7 @@ sycl::event kernels_fp<Float>::build_mst(sycl::queue& queue,
273274
from_vec[e] = min_from_vec[best];
274275
to_vec[e] = best;
275276
weight_vec[e] = best_w;
276-
in_mst_vec[best] = true;
277+
in_mst_vec[best] = 1;
277278

278279
for (std::int64_t j = 0; j < n; j++) {
279280
if (in_mst_vec[j])
Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
#!/usr/bin/env python3
2+
"""
3+
Benchmark: sklearnex HDBSCAN (oneDAL backend) vs stock sklearn HDBSCAN.
4+
5+
Measures wall-clock time, ARI vs ground truth, ARI between implementations,
6+
and cluster counts across multiple dataset sizes and algorithm choices.
7+
8+
Usage:
9+
# Ensure sklearnex is built against patched oneDAL:
10+
# conda activate build_sklearnex
11+
# source __release_lnx/daal/latest/env/vars.sh
12+
# cd ../scikit-learn-intelex && python setup.py develop --no-deps
13+
#
14+
# Then run:
15+
python scripts/benchmark_sklearnex_hdbscan.py
16+
"""
17+
18+
import time
19+
import sys
20+
import warnings
21+
22+
import numpy as np
23+
from sklearn.datasets import make_blobs
24+
from sklearn.metrics import adjusted_rand_score
25+
from sklearn.cluster import HDBSCAN as SklearnHDBSCAN
26+
27+
try:
28+
from sklearnex.cluster import HDBSCAN as SklearnexHDBSCAN
29+
except ImportError:
30+
print("ERROR: sklearnex not available. Build it first:")
31+
print(" conda activate build_sklearnex")
32+
print(" cd scikit-learn-intelex && python setup.py develop --no-deps")
33+
sys.exit(1)
34+
35+
# Suppress sklearn convergence/future warnings
36+
warnings.filterwarnings("ignore")
37+
38+
# ---- Configuration ----
39+
SIZES = [1000, 5000, 10000, 20000, 50000]
40+
N_FEATURES = 10
41+
N_CENTERS = 10
42+
MIN_CLUSTER_SIZE = 15
43+
MIN_SAMPLES = 5
44+
N_RUNS = 3 # take median
45+
ALGORITHMS = ["brute", "auto"] # auto maps to kd_tree for euclidean in sklearnex
46+
RANDOM_STATE = 42
47+
48+
49+
def count_clusters(labels):
50+
return len(set(labels)) - (1 if -1 in labels else 0)
51+
52+
53+
def noise_fraction(labels):
54+
return np.sum(labels == -1) / len(labels)
55+
56+
57+
def bench_one(X, y_true, algorithm):
58+
"""Benchmark sklearn and sklearnex on a single dataset."""
59+
60+
# sklearn
61+
times_sk = []
62+
sk_labels = None
63+
for _ in range(N_RUNS):
64+
t0 = time.perf_counter()
65+
sk = SklearnHDBSCAN(
66+
min_cluster_size=MIN_CLUSTER_SIZE,
67+
min_samples=MIN_SAMPLES,
68+
algorithm=algorithm,
69+
)
70+
sk.fit(X)
71+
times_sk.append(time.perf_counter() - t0)
72+
sk_labels = sk.labels_
73+
74+
# sklearnex
75+
times_sx = []
76+
sx_labels = None
77+
for _ in range(N_RUNS):
78+
t0 = time.perf_counter()
79+
sx = SklearnexHDBSCAN(
80+
min_cluster_size=MIN_CLUSTER_SIZE,
81+
min_samples=MIN_SAMPLES,
82+
algorithm=algorithm,
83+
)
84+
sx.fit(X)
85+
times_sx.append(time.perf_counter() - t0)
86+
sx_labels = sx.labels_
87+
88+
t_sk = np.median(times_sk)
89+
t_sx = np.median(times_sx)
90+
speedup = t_sk / t_sx if t_sx > 0 else float("inf")
91+
92+
ari_sk_true = adjusted_rand_score(y_true, sk_labels)
93+
ari_sx_true = adjusted_rand_score(y_true, sx_labels)
94+
ari_cross = adjusted_rand_score(sk_labels, sx_labels)
95+
96+
nc_sk = count_clusters(sk_labels)
97+
nc_sx = count_clusters(sx_labels)
98+
nf_sk = noise_fraction(sk_labels)
99+
nf_sx = noise_fraction(sx_labels)
100+
101+
return {
102+
"t_sk": t_sk,
103+
"t_sx": t_sx,
104+
"speedup": speedup,
105+
"ari_sk_true": ari_sk_true,
106+
"ari_sx_true": ari_sx_true,
107+
"ari_cross": ari_cross,
108+
"nc_sk": nc_sk,
109+
"nc_sx": nc_sx,
110+
"nf_sk": nf_sk,
111+
"nf_sx": nf_sx,
112+
}
113+
114+
115+
def main():
116+
print("=" * 110)
117+
print("HDBSCAN Benchmark: sklearnex (oneDAL) vs sklearn")
118+
print(f"Config: features={N_FEATURES}, centers={N_CENTERS}, "
119+
f"mcs={MIN_CLUSTER_SIZE}, ms={MIN_SAMPLES}, runs={N_RUNS}")
120+
print("=" * 110)
121+
122+
for algorithm in ALGORITHMS:
123+
print(f"\n--- algorithm='{algorithm}' ---")
124+
print(f"{'N':>7s} | {'sklearn':>9s} {'sklearnex':>9s} {'speedup':>8s} | "
125+
f"{'ARI(sk)':>7s} {'ARI(sx)':>7s} {'ARI(x)':>7s} | "
126+
f"{'cl_sk':>5s} {'cl_sx':>5s} {'noise_sk':>8s} {'noise_sx':>8s}")
127+
print("-" * 110)
128+
129+
# Warmup run
130+
X_warm, _ = make_blobs(n_samples=200, n_features=N_FEATURES,
131+
centers=3, random_state=0)
132+
SklearnHDBSCAN(min_cluster_size=5).fit(X_warm)
133+
SklearnexHDBSCAN(min_cluster_size=5).fit(X_warm)
134+
135+
for n in SIZES:
136+
X, y_true = make_blobs(
137+
n_samples=n,
138+
n_features=N_FEATURES,
139+
centers=N_CENTERS,
140+
cluster_std=1.0,
141+
random_state=RANDOM_STATE,
142+
)
143+
144+
r = bench_one(X, y_true, algorithm)
145+
146+
print(
147+
f"{n:7d} | "
148+
f"{r['t_sk']:9.3f}s {r['t_sx']:9.3f}s {r['speedup']:7.1f}x | "
149+
f"{r['ari_sk_true']:7.4f} {r['ari_sx_true']:7.4f} {r['ari_cross']:7.4f} | "
150+
f"{r['nc_sk']:5d} {r['nc_sx']:5d} {r['nf_sk']:8.2%} {r['nf_sx']:8.2%}"
151+
)
152+
153+
print("\n" + "=" * 110)
154+
print("Legend:")
155+
print(" sklearn = stock sklearn.cluster.HDBSCAN (median of 3 runs)")
156+
print(" sklearnex = sklearnex.cluster.HDBSCAN backed by oneDAL (median of 3 runs)")
157+
print(" speedup = sklearn_time / sklearnex_time")
158+
print(" ARI(sk) = Adjusted Rand Index of sklearn labels vs ground truth")
159+
print(" ARI(sx) = Adjusted Rand Index of sklearnex labels vs ground truth")
160+
print(" ARI(x) = Adjusted Rand Index between sklearn and sklearnex labels")
161+
print(" cl_sk/sx = number of clusters found")
162+
print(" noise_sk/sx = fraction of points labeled as noise")
163+
print("=" * 110)
164+
165+
166+
if __name__ == "__main__":
167+
main()

0 commit comments

Comments
 (0)