Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion include/dbscan/algo.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,14 @@ int DBSCAN(intT n, floatT* PF, double epsilon, intT minPts, bool* coreFlagOut, i

typedef kdTree<dim, pointT> treeT;
auto trees = newA(treeT*, G->numCell());
parallel_for(0, G->numCell(), [&](intT i) {trees[i] = NULL;});

parallel_for(0, G->numCell(), [&](intT i) {
if (ccFlag[i]) {
trees[i] = new treeT(G->getCell(i)->getItem(), G->getCell(i)->size(), false);
Copy link
Copy Markdown
Owner

@wangyiqiu wangyiqiu Jun 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Major leak: hasEdge duplicates some new treeT objects without removing them. Fix - preallocate them. Note - I had trouble using locks here.

Thank you. In hasEdge, the trees were allocated on demand to save memory in case they are not required. How are the trees getting duplicated without removed? From concurrent calls to hasEdge?

Copy link
Copy Markdown
Contributor Author

@John-194 John-194 Jun 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think what was happening was that in hasEdge

  if (!trees[n1])
    trees[n1] = new treeT(cells[n1].getItem(), cells[n1].size(), false);//todo allocation, parallel
  if (!trees[n2]) 
    trees[n2] = new treeT(cells[n2].getItem(), cells[n2].size(), false);//todo allocation, parallel

while one thread is doing new treeT, another thread sees trees[x] is empty, so it tries to do the same. Both threads assign the new treeT to the same index, one of them leaks.

This fix can be improved to work as intended, but currently it works, and speed is improved due to threads not doing duplicate work, so I moved on to fixing other areas.

} else {
trees[i] = NULL;
}
});

// auto degCmp = [&](intT i, intT j) {
// return G->getCell(i)->size() < G->getCell(j)->size();
Expand Down
55 changes: 40 additions & 15 deletions include/dbscan/grid.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

#pragma once

#include <mutex>
#include "cell.h"
#include "point.h"
#include "shared.h"
Expand Down Expand Up @@ -77,6 +78,7 @@ struct grid {
treeT* tree=NULL;
intT totalPoints;
cellBuf **nbrCache;
std::mutex* cacheLocks;

/**
* Grid constructor.
Expand All @@ -89,10 +91,12 @@ struct grid {

cells = newA(cellT, cellCapacity);
nbrCache = newA(cellBuf*, cellCapacity);
cacheLocks = (std::mutex*) malloc(cellCapacity * sizeof(std::mutex));
parallel_for(0, cellCapacity, [&](intT i) {
nbrCache[i] = NULL;
cells[i].init();
});
new (&cacheLocks[i]) std::mutex();
nbrCache[i] = NULL;
cells[i].init();
});
numCells = 0;

myHash = new cellHashT(pMinn, r);
Expand All @@ -101,9 +105,10 @@ struct grid {

~grid() {
free(cells);
parallel_for(0, numCells, [&](intT i) {
if(nbrCache[i]) delete nbrCache[i];
});
free(cacheLocks);
parallel_for(0, cellCapacity, [&](intT i) {
if(nbrCache[i]) delete nbrCache[i];
});
free(nbrCache);
if(myHash) delete myHash;
if(table) {
Expand Down Expand Up @@ -141,14 +146,24 @@ struct grid {
}
}
return false;};//todo, optimize
if (nbrCache[bait-cells]) {
auto accum = nbrCache[bait-cells];
int idx = bait - cells;
if (nbrCache[idx]) {
auto accum = nbrCache[idx];
for (auto accum_i : *accum) {
if(fWrap(accum_i)) break;
}
} else {
floatT hop = sqrt(dim + 3) * 1.0000001;
nbrCache[bait-cells] = tree->rangeNeighbor(bait, r * hop, fStop, fWrap, true, nbrCache[bait-cells]);
// wait for other threads to do their thing then try again
std::lock_guard<std::mutex> lock(cacheLocks[idx]);
Copy link
Copy Markdown
Owner

@wangyiqiu wangyiqiu Jun 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor leak: Multiple threads cache the same data in rangeNeighbor at once, overwriting each other. Fix - the use of Locks.

Thank you. Did you notice any performance degradation as a result of using mutex?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did not notice a change in the performance. Threads should now be doing less unnecessary work, which should overcome the performance cost of mutex.

if (nbrCache[idx]) {
auto accum = nbrCache[idx];
for (auto accum_i : *accum) {
if (fWrap(accum_i)) break;
}
} else {
floatT hop = sqrt(dim + 3) * 1.0000001;
nbrCache[idx] = tree->rangeNeighbor(bait, r * hop, fStop, fWrap, true, nbrCache[idx]);
}
}
}

Expand All @@ -160,14 +175,24 @@ struct grid {
return f(cell);
return false;
};
if (nbrCache[bait-cells]) {
auto accum = nbrCache[bait-cells];
int idx = bait - cells;
if (nbrCache[idx]) {
auto accum = nbrCache[idx];
for (auto accum_i : *accum) {
if(fWrap(accum_i)) break;
if (fWrap(accum_i)) break;
}
} else {
floatT hop = sqrt(dim + 3) * 1.0000001;
nbrCache[bait-cells] = tree->rangeNeighbor(bait, r * hop, fStop, fWrap, true, nbrCache[bait-cells]);
// wait for other threads to do their thing then try again
std::lock_guard<std::mutex> lock(cacheLocks[idx]);
if (nbrCache[idx]) {
auto accum = nbrCache[idx];
for (auto accum_i : *accum) {
if (fWrap(accum_i)) break;
}
} else {
floatT hop = sqrt(dim + 3) * 1.0000001;
nbrCache[bait-cells] = tree->rangeNeighbor(bait, r * hop, fStop, fWrap, true, nbrCache[idx]);
}
}
}

Expand Down
1 change: 0 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ def initialize_options(self):
depends=depends,
py_limited_api=True,
define_macros=[
('Py_LIMITED_API', '0x03020000'),
('NPY_NO_DEPRECATED_API', 'NPY_1_7_API_VERSION'),
# ('DBSCAN_VERSION', json.dumps(version)),
]
Expand Down
36 changes: 32 additions & 4 deletions src/dbscanmodule.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,27 @@
#include "dbscan/pbbs/parallel.h"


Copy link
Copy Markdown
Owner

@wangyiqiu wangyiqiu Jun 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for fixing this!

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No problem! I also have a fix for Arm CPU crashing, which I will upload later. There is also a very rare segmentation fault (~1 in a million chance) that I am looking into.

static bool scheduler_initialized = false;
static PyObject* scheduler_cleanup_weakref = nullptr;

static void cleanup_scheduler(PyObject *capsule)
{
if (scheduler_initialized)
{
parlay::internal::stop_scheduler();
scheduler_initialized = false;
}
}

static void ensure_scheduler_initialized()
{
if (!scheduler_initialized)
{
parlay::internal::start_scheduler();
scheduler_initialized = true;
}
}

static PyObject* DBSCAN_py(PyObject* self, PyObject* args, PyObject *kwargs)
{
PyObject *Xobj;
Expand Down Expand Up @@ -58,7 +79,7 @@ static PyObject* DBSCAN_py(PyObject* self, PyObject* args, PyObject *kwargs)
PyArrayObject* core_samples = (PyArrayObject*)PyArray_SimpleNew(1, &n, NPY_BOOL);
PyArrayObject* labels = (PyArrayObject*)PyArray_SimpleNew(1, &n, NPY_INT);

parlay::internal::start_scheduler();
ensure_scheduler_initialized();

DBSCAN(
dim,
Expand All @@ -70,9 +91,11 @@ static PyObject* DBSCAN_py(PyObject* self, PyObject* args, PyObject *kwargs)
(int*)PyArray_DATA(labels)
);

parlay::internal::stop_scheduler();

return PyTuple_Pack(2, labels, core_samples);
PyObject* result_tuple = PyTuple_Pack(2, labels, core_samples);
Py_DECREF(X);
Py_DECREF(core_samples);
Py_DECREF(labels);
return result_tuple;
}

PyDoc_STRVAR(doc_DBSCAN,
Expand Down Expand Up @@ -126,6 +149,11 @@ PyInit__dbscan(void)
#endif
PyModule_AddIntMacro(module, DBSCAN_MIN_DIMS);
PyModule_AddIntMacro(module, DBSCAN_MAX_DIMS);
PyObject *capsule = PyCapsule_New((void *)module, "dbscan.scheduler", cleanup_scheduler);
if (capsule != NULL)
{
PyModule_AddObject(module, "_scheduler_capsule", capsule);
}

return module;
}
Loading