diff --git a/.github/workflows/test-bench.yml b/.github/workflows/test-bench.yml index 3db0f64..a88255c 100644 --- a/.github/workflows/test-bench.yml +++ b/.github/workflows/test-bench.yml @@ -53,7 +53,7 @@ jobs: - name: Run benchmarks run: | - python -m unittest discover -s tests -p 'test_bench_*.py' + python -m pytest tests/test_bench_*.py -s test-bench-job: needs: test-bench-matrix-job diff --git a/.github/workflows/test-unit.yml b/.github/workflows/test-unit.yml index c3f6bb1..af4530b 100644 --- a/.github/workflows/test-unit.yml +++ b/.github/workflows/test-unit.yml @@ -53,7 +53,7 @@ jobs: - name: Run tests and code coverage run: | - coverage run --source=src -m unittest discover -s tests -p 'test_unit_*.py' + coverage run --source=src -m pytest tests/test_unit_*.py coverage report -m - name: Upload coverage reports to Codecov diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 1565158..2c4e26e 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -67,17 +67,17 @@ Follow these steps to contribute: the naming convention `test_bench_*.py`. 4. **Run Tests**. - Ensure your changes pass all tests before committing. We use `unittest` as + Ensure your changes pass all tests before committing. We use `pytest` as test framework: ```bash - python -m unittest discover -s tests -p 'test_*.py' + python -m pytest tests/test_*.py ``` Before each commit make sure to check code coverage: ```bash - coverage run --source=src -m unittest discover -s tests -p 'test_*.py' + coverage run --source=src -m pytest tests/test_*.py ``` 5. **Commit and Push Your Changes**. diff --git a/app/streamlit_app.py b/app/streamlit_app.py index eb0c773..9637892 100644 --- a/app/streamlit_app.py +++ b/app/streamlit_app.py @@ -22,7 +22,6 @@ from sklearn.decomposition import PCA from umap import UMAP -from tdamapper._plot_plotly import _marker_size from tdamapper.core import aggregate_graph from tdamapper.cover import BallCover, CubicalCover from tdamapper.learn import MapperAlgorithm @@ -137,14 +136,14 @@ def _check_limits_mapper_graph(mapper_graph): if LIMITS_ENABLED: num_nodes = mapper_graph.number_of_nodes() if num_nodes > LIMITS_NUM_NODES: - logging.warn("Too many nodes.") + logging.warning("Too many nodes.") raise ValueError( "Too many nodes: select different parameters or run the app " "locally on your machine." ) num_edges = mapper_graph.number_of_edges() if num_edges > LIMITS_NUM_EDGES: - logging.warn("Too many edges.") + logging.warning("Too many edges.") raise ValueError( "Too many edges: select different parameters or run the app " "locally on your machine." @@ -155,14 +154,14 @@ def _check_limits_dataset(df_X, df_y): if LIMITS_ENABLED: num_samples = len(df_X) if num_samples > LIMITS_NUM_SAMPLES: - logging.warn("Dataset too big.") + logging.warning("Dataset too big.") raise ValueError( "Dataset too big: select a different dataset or run the app " "locally on your machine." ) num_features = len(df_X.columns) + len(df_y.columns) if num_features > LIMITS_NUM_FEATURES: - logging.warn("Too many features.") + logging.warning("Too many features.") raise ValueError( "Too many features: select a different dataset or run the app " "locally on your machine." @@ -529,8 +528,8 @@ def mapper_input_section(X): mapper_algo = MapperAlgorithm( cover=cover, clustering=clustering, - verbose=True, - n_jobs=1, + verbose=False, + n_jobs=-2, ) mapper_graph = compute_mapper(mapper_algo, X, lens) return mapper_graph @@ -628,11 +627,12 @@ def compute_mapper_fig(mapper_plot, colors, node_size, cmap, _agg, agg_name): colors, node_size=node_size, agg=_agg, - title=[f"{agg_name} of {c}" for c in colors.columns], + title=[f"{c}" for c in colors.columns], cmap=cmap, width=600, height=600, ) + logger.info("Done") return mapper_fig diff --git a/pyproject.toml b/pyproject.toml index e0473eb..e6025ec 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -46,6 +46,7 @@ dev = [ "coverage[toml]", "pandas", "scikit-learn<1.6.0", + "pytest", "black[jupyter]", "isort", "flake8", diff --git a/src/tdamapper/_plot_plotly.py b/src/tdamapper/_plot_plotly.py index db79a1f..25654b8 100644 --- a/src/tdamapper/_plot_plotly.py +++ b/src/tdamapper/_plot_plotly.py @@ -57,7 +57,7 @@ def plot_plotly( mapper_plot, width: int, height: int, - node_size: int = DEFAULT_NODE_SIZE, + node_size: Optional[Union[int, List[int]]] = DEFAULT_NODE_SIZE, colors=None, title: Optional[Union[str, List[str]]] = None, agg=np.nanmean, @@ -73,8 +73,9 @@ def plot_plotly( titles = [title for _ in range(colors_num)] elif isinstance(title, list) and len(title) == colors_num: titles = title - fig = _figure(mapper_plot, width, height, node_size, colors, titles, agg, cmaps) - _add_ui_to_layout(mapper_plot, fig, colors, titles, node_size, agg, cmaps) + node_sizes = [node_size] if isinstance(node_size, int) else node_size + fig = _figure(mapper_plot, width, height, node_sizes, colors, titles, agg, cmaps) + _add_ui_to_layout(mapper_plot, fig, colors, titles, node_sizes, agg, cmaps) return fig @@ -220,7 +221,7 @@ def _update_layout(fig, width, height): ) -def _figure(mapper_plot, width, height, node_size, colors, titles, agg, cmaps): +def _figure(mapper_plot, width, height, node_sizes, colors, titles, agg, cmaps): node_pos = mapper_plot.positions node_pos_arr = _node_pos_array( mapper_plot.graph, @@ -239,7 +240,7 @@ def _figure(mapper_plot, width, height, node_size, colors, titles, agg, cmaps): _set_cmap(mapper_plot, fig, cmaps[0]) _set_colors(mapper_plot, fig, colors[:, 0], agg) - _set_node_size(mapper_plot, fig, node_size) + _set_node_size(mapper_plot, fig, node_sizes[len(node_sizes) // 2]) _set_title(mapper_plot, fig, titles[0]) return fig @@ -387,7 +388,7 @@ def _layout(width, height): ) -def _add_ui_to_layout(mapper_plot, mapper_fig, colors, titles, node_size, agg, cmaps): +def _add_ui_to_layout(mapper_plot, mapper_fig, colors, titles, node_sizes, agg, cmaps): cmaps_plotly = [PLOTLY_CMAPS.get(c.lower()) for c in cmaps] menu_color = _ui_color(mapper_plot, colors, titles, agg) if menu_color["buttons"]: @@ -396,7 +397,7 @@ def _add_ui_to_layout(mapper_plot, mapper_fig, colors, titles, node_size, agg, c menu_color["x"] = -0.25 menu_cmap = _ui_cmap(mapper_plot, cmaps_plotly) menu_cmap["x"] = menu_color["x"] + 0.25 - slider_size = _ui_node_size(mapper_plot, node_size) + slider_size = _ui_node_size(mapper_plot, node_sizes) mapper_fig.update_layout( updatemenus=[menu_cmap, menu_color], sliders=[slider_size], @@ -441,7 +442,7 @@ def _update_cmap(cmap): ) -def _ui_node_size(mapper_plot, node_size): +def _ui_node_size(mapper_plot, node_sizes): steps = [ dict( method="restyle", @@ -451,7 +452,7 @@ def _ui_node_size(mapper_plot, node_size): [1], ], ) - for size in [node_size * x / 10.0 for x in range(1, 20)] + for size in node_sizes ] return dict( diff --git a/src/tdamapper/utils/vptree_flat/ball_search.py b/src/tdamapper/utils/vptree_flat/ball_search.py index 418375f..23acbc9 100644 --- a/src/tdamapper/utils/vptree_flat/ball_search.py +++ b/src/tdamapper/utils/vptree_flat/ball_search.py @@ -5,18 +5,18 @@ class BallSearch: def __init__(self, vpt, point, eps, inclusive=True): self._arr = vpt._get_arr() - self.__distance = vpt._get_distance() - self.__point = point - self.__eps = eps - self.__inclusive = inclusive + self._distance = vpt._get_distance() + self._point = point + self._eps = eps + self._inclusive = inclusive def search(self): return self._search_iter() def _inside(self, dist): - if self.__inclusive: - return dist <= self.__eps - return dist < self.__eps + if self._inclusive: + return dist <= self._eps + return dist < self._eps def _search_iter(self): stack = [(0, self._arr.size())] @@ -28,11 +28,11 @@ def _search_iter(self): is_terminal = self._arr.is_terminal(start) if is_terminal: for x in self._arr.get_points(start, end): - dist = self.__distance(self.__point, x) + dist = self._distance(self._point, x) if self._inside(dist): result.append(x) else: - dist = self.__distance(self.__point, v_point) + dist = self._distance(self._point, v_point) mid = _mid(start, end) if self._inside(dist): result.append(v_point) @@ -42,7 +42,7 @@ def _search_iter(self): else: fst = (mid, end) snd = (start + 1, mid) - if abs(dist - v_radius) <= self.__eps: + if abs(dist - v_radius) <= self._eps: stack.append(snd) stack.append(fst) return result diff --git a/src/tdamapper/utils/vptree_flat/builder.py b/src/tdamapper/utils/vptree_flat/builder.py index a19b1c1..6843e51 100644 --- a/src/tdamapper/utils/vptree_flat/builder.py +++ b/src/tdamapper/utils/vptree_flat/builder.py @@ -12,7 +12,7 @@ def _mid(start, end): class Builder: def __init__(self, vpt, X): - self.__distance = vpt._get_distance() + self._distance = vpt._get_distance() dataset = [x for x in X] indices = np.array([i for i in range(len(dataset))]) @@ -20,14 +20,14 @@ def __init__(self, vpt, X): is_terminal = np.array([False for _ in X]) self._arr = VPArray(dataset, distances, indices, is_terminal) - self.__leaf_capacity = vpt.get_leaf_capacity() - self.__leaf_radius = vpt.get_leaf_radius() + self._leaf_capacity = vpt.get_leaf_capacity() + self._leaf_radius = vpt.get_leaf_radius() pivoting = vpt.get_pivoting() - self.__pivoting = self._pivoting_disabled + self._pivoting = self._pivoting_disabled if pivoting == "random": - self.__pivoting = self._pivoting_random + self._pivoting = self._pivoting_random elif pivoting == "furthest": - self.__pivoting = self._pivoting_furthest + self._pivoting = self._pivoting_furthest def _pivoting_disabled(self, start, end): pass @@ -45,7 +45,7 @@ def _furthest(self, start, end, i): i_point = self._arr.get_point(i) for j in range(start, end): j_point = self._arr.get_point(j) - j_dist = self.__distance(i_point, j_point) + j_dist = self._distance(i_point, j_point) if j_dist > furthest_dist: furthest = j furthest_dist = j_dist @@ -61,12 +61,12 @@ def _pivoting_furthest(self, start, end): self._arr.swap(start, furthest) def _update(self, start, end): - self.__pivoting(start, end) + self._pivoting(start, end) v_point = self._arr.get_point(start) is_terminal = self._arr.is_terminal(start) for i in range(start + 1, end): point = self._arr.get_point(i) - self._arr.set_distance(i, self.__distance(v_point, point)) + self._arr.set_distance(i, self._distance(v_point, point)) self._arr.set_terminal(i, is_terminal) def build(self): @@ -81,8 +81,8 @@ def _build_iter(self): self._update(start, end) self._arr.partition(start + 1, end, mid) v_radius = self._arr.get_distance(mid) - if (end - start > 2 * self.__leaf_capacity) and ( - v_radius > self.__leaf_radius + if (end - start > 2 * self._leaf_capacity) and ( + v_radius > self._leaf_radius ): self._arr.set_distance(start, v_radius) self._arr.set_terminal(start, False) diff --git a/src/tdamapper/utils/vptree_flat/knn_search.py b/src/tdamapper/utils/vptree_flat/knn_search.py index 57b1d6c..3f7fa1d 100644 --- a/src/tdamapper/utils/vptree_flat/knn_search.py +++ b/src/tdamapper/utils/vptree_flat/knn_search.py @@ -1,41 +1,43 @@ from tdamapper.utils.heap import MaxHeap from tdamapper.utils.vptree_flat.common import _mid +_PRE = 0 +_POST = 1 + class KnnSearch: def __init__(self, vpt, point, neighbors): self._arr = vpt._get_arr() - self.__distance = vpt._get_distance() - self.__point = point - self.__neighbors = neighbors - self.__radius = float("inf") - self.__result = MaxHeap() + self._distance = vpt._get_distance() + self._point = point + self._neighbors = neighbors + self._radius = float("inf") + self._result = MaxHeap() def _get_items(self): - while len(self.__result) > self.__neighbors: - self.__result.pop() - return [x for (_, x) in self.__result] + while len(self._result) > self._neighbors: + self._result.pop() + return [x for (_, x) in self._result] def search(self): self._search_iter() return self._get_items() def _process(self, x): - dist = self.__distance(self.__point, x) - if dist >= self.__radius: + dist = self._distance(self._point, x) + if dist >= self._radius: return dist - self.__result.add(dist, x) - while len(self.__result) > self.__neighbors: - self.__result.pop() - if len(self.__result) == self.__neighbors: - self.__radius, _ = self.__result.top() + self._result.add(dist, x) + while len(self._result) > self._neighbors: + self._result.pop() + if len(self._result) == self._neighbors: + self._radius, _ = self._result.top() return dist def _search_iter(self): - PRE, POST = 0, 1 - self.__result = MaxHeap() - stack = [(0, self._arr.size(), 0.0, PRE)] + self._result = MaxHeap() + stack = [(0, self._arr.size(), 0.0, _PRE)] while stack: start, end, thr, action = stack.pop() @@ -47,7 +49,7 @@ def _search_iter(self): for x in self._arr.get_points(start, end): self._process(x) else: - if action == PRE: + if action == _PRE: mid = _mid(start, end) dist = self._process(v_point) if dist <= v_radius: @@ -56,9 +58,9 @@ def _search_iter(self): else: fst_start, fst_end = mid, end snd_start, snd_end = start + 1, mid - stack.append((snd_start, snd_end, abs(v_radius - dist), POST)) - stack.append((fst_start, fst_end, 0.0, PRE)) - elif action == POST: - if self.__radius > thr: - stack.append((start, end, 0.0, PRE)) + stack.append((snd_start, snd_end, abs(v_radius - dist), _POST)) + stack.append((fst_start, fst_end, 0.0, _PRE)) + elif action == _POST: + if self._radius > thr: + stack.append((start, end, 0.0, _PRE)) return self._get_items() diff --git a/src/tdamapper/utils/vptree_flat/vptree.py b/src/tdamapper/utils/vptree_flat/vptree.py index f157bbc..164addf 100755 --- a/src/tdamapper/utils/vptree_flat/vptree.py +++ b/src/tdamapper/utils/vptree_flat/vptree.py @@ -15,34 +15,34 @@ def __init__( leaf_radius=0.0, pivoting=None, ): - self.__metric = metric - self.__metric_params = metric_params - self.__leaf_capacity = leaf_capacity - self.__leaf_radius = leaf_radius - self.__pivoting = pivoting + self._metric = metric + self._metric_params = metric_params + self._leaf_capacity = leaf_capacity + self._leaf_radius = leaf_radius + self._pivoting = pivoting self._arr = Builder(self, X).build() def get_metric(self): - return self.__metric + return self._metric def get_metric_params(self): - return self.__metric_params + return self._metric_params def get_leaf_capacity(self): - return self.__leaf_capacity + return self._leaf_capacity def get_leaf_radius(self): - return self.__leaf_radius + return self._leaf_radius def get_pivoting(self): - return self.__pivoting + return self._pivoting def _get_arr(self): return self._arr def _get_distance(self): - metric_params = self.__metric_params or {} - return get_metric(self.__metric, **metric_params) + metric_params = self._metric_params or {} + return get_metric(self._metric, **metric_params) def ball_search(self, point, eps, inclusive=True): return BallSearch(self, point, eps, inclusive).search() diff --git a/src/tdamapper/utils/vptree_hier/ball_search.py b/src/tdamapper/utils/vptree_hier/ball_search.py index 1f519a3..fb4b7f7 100644 --- a/src/tdamapper/utils/vptree_hier/ball_search.py +++ b/src/tdamapper/utils/vptree_hier/ball_search.py @@ -1,40 +1,40 @@ class BallSearch: def __init__(self, vpt, point, eps, inclusive=True): - self.__tree = vpt._get_tree() + self._tree = vpt._get_tree() self._arr = vpt._get_arr() - self.__distance = vpt._get_distance() - self.__point = point - self.__eps = eps - self.__inclusive = inclusive - self.__result = [] + self._distance = vpt._get_distance() + self._point = point + self._eps = eps + self._inclusive = inclusive + self._result = [] def search(self): - self.__result.clear() - self._search_rec(self.__tree) - return self.__result + self._result.clear() + self._search_rec(self._tree) + return self._result def _inside(self, dist): - if self.__inclusive: - return dist <= self.__eps - return dist < self.__eps + if self._inclusive: + return dist <= self._eps + return dist < self._eps def _search_rec(self, tree): if tree.is_terminal(): start, end = tree.get_bounds() for x in self._arr.get_points(start, end): - dist = self.__distance(self.__point, x) + dist = self._distance(self._point, x) if self._inside(dist): - self.__result.append(x) + self._result.append(x) else: v_radius, v_point = tree.get_ball() - dist = self.__distance(v_point, self.__point) + dist = self._distance(v_point, self._point) if self._inside(dist): - self.__result.append(v_point) + self._result.append(v_point) if dist <= v_radius: fst, snd = tree.get_left(), tree.get_right() else: fst, snd = tree.get_right(), tree.get_left() self._search_rec(fst) - if abs(dist - v_radius) <= self.__eps: + if abs(dist - v_radius) <= self._eps: self._search_rec(snd) diff --git a/src/tdamapper/utils/vptree_hier/builder.py b/src/tdamapper/utils/vptree_hier/builder.py index 944b93e..a68eee4 100644 --- a/src/tdamapper/utils/vptree_hier/builder.py +++ b/src/tdamapper/utils/vptree_hier/builder.py @@ -2,28 +2,27 @@ import numpy as np -from tdamapper.utils.quickselect import quickselect, swap_all from tdamapper.utils.vptree_hier.common import Leaf, Node, VPArray, _mid class Builder: def __init__(self, vpt, X): - self.__distance = vpt._get_distance() + self._distance = vpt._get_distance() dataset = [x for x in X] indices = np.array([i for i in range(len(dataset))]) distances = np.array([0.0 for _ in X]) self._arr = VPArray(dataset, distances, indices) - self.__leaf_capacity = vpt.get_leaf_capacity() - self.__leaf_radius = vpt.get_leaf_radius() + self._leaf_capacity = vpt.get_leaf_capacity() + self._leaf_radius = vpt.get_leaf_radius() pivoting = vpt.get_pivoting() - self.__pivoting = self._pivoting_disabled + self._pivoting = self._pivoting_disabled if pivoting == "random": - self.__pivoting = self._pivoting_random + self._pivoting = self._pivoting_random elif pivoting == "furthest": - self.__pivoting = self._pivoting_furthest + self._pivoting = self._pivoting_furthest def _pivoting_disabled(self, start, end): pass @@ -41,7 +40,7 @@ def _furthest(self, start, end, i): i_point = self._arr.get_point(i) for j in range(start, end): j_point = self._arr.get_point(j) - j_dist = self.__distance(i_point, j_point) + j_dist = self._distance(i_point, j_point) if j_dist > furthest_dist: furthest = j furthest_dist = j_dist @@ -57,11 +56,11 @@ def _pivoting_furthest(self, start, end): self._arr.swap(start, furthest) def _update(self, start, end): - self.__pivoting(start, end) + self._pivoting(start, end) v_point = self._arr.get_point(start) for i in range(start + 1, end): point = self._arr.get_point(i) - self._arr.set_distance(i, self.__distance(v_point, point)) + self._arr.set_distance(i, self._distance(v_point, point)) def build(self): tree = self._build_rec(0, self._arr.size()) @@ -74,9 +73,7 @@ def _build_rec(self, start, end): self._arr.partition(start + 1, end, mid) v_radius = self._arr.get_distance(mid) self._arr.set_distance(start, v_radius) - if (end - start <= 2 * self.__leaf_capacity) or ( - v_radius <= self.__leaf_radius - ): + if (end - start <= 2 * self._leaf_capacity) or (v_radius <= self._leaf_radius): left = Leaf(start + 1, mid) right = Leaf(mid, end) else: diff --git a/src/tdamapper/utils/vptree_hier/knn_search.py b/src/tdamapper/utils/vptree_hier/knn_search.py index a653141..37bb053 100644 --- a/src/tdamapper/utils/vptree_hier/knn_search.py +++ b/src/tdamapper/utils/vptree_hier/knn_search.py @@ -4,43 +4,43 @@ class KnnSearch: def __init__(self, vpt, point, neighbors): - self.__tree = vpt._get_tree() + self._tree = vpt._get_tree() self._arr = vpt._get_arr() - self.__distance = vpt._get_distance() - self.__point = point - self.__neighbors = neighbors - self.__items = MaxHeap() + self._distance = vpt._get_distance() + self._point = point + self._neighbors = neighbors + self._items = MaxHeap() def _add(self, dist, x): - self.__items.add(dist, x) - if len(self.__items) > self.__neighbors: - self.__items.pop() + self._items.add(dist, x) + if len(self._items) > self._neighbors: + self._items.pop() def _get_items(self): - while len(self.__items) > self.__neighbors: - self.__items.pop() - return [x for (_, x) in self.__items] + while len(self._items) > self._neighbors: + self._items.pop() + return [x for (_, x) in self._items] def _get_radius(self): - if len(self.__items) < self.__neighbors: + if len(self._items) < self._neighbors: return float("inf") - furthest_dist, _ = self.__items.top() + furthest_dist, _ = self._items.top() return furthest_dist def search(self): - self._search_rec(self.__tree) + self._search_rec(self._tree) return self._get_items() def _search_rec(self, tree): if tree.is_terminal(): start, end = tree.get_bounds() for x in self._arr.get_points(start, end): - dist = self.__distance(self.__point, x) + dist = self._distance(self._point, x) if dist < self._get_radius(): self._add(dist, x) else: v_radius, v_point = tree.get_ball() - dist = self.__distance(v_point, self.__point) + dist = self._distance(v_point, self._point) if dist < self._get_radius(): self._add(dist, v_point) if dist <= v_radius: diff --git a/tests/test_bench_cover.py b/tests/test_bench_cover.py index 0f94ee3..ee3f4ac 100644 --- a/tests/test_bench_cover.py +++ b/tests/test_bench_cover.py @@ -1,6 +1,5 @@ import logging import time -import unittest import numpy as np from sklearn.datasets import load_digits @@ -26,61 +25,63 @@ def dist_proj(x, y): return dist(x[1:], x[1:]) -class TestVpSettings(unittest.TestCase): - - setup_logging() - logger = logging.getLogger(__name__) - - def cover(self, vpt, X, r): - covered_ids = set() - for i, xi in enumerate(X): - if i not in covered_ids: - neigh = vpt.ball_search(xi, r) - neigh_ids = [int(x[0]) for x in neigh] - covered_ids.update(neigh_ids) - if neigh_ids: - yield neigh_ids - - def run_bench(self, X, r, dist, vp, **kwargs): - XX = np.array([[i] + [xi for xi in x] for i, x in enumerate(X)]) - t0 = time.time() - vpt = vp(XX, metric=dist_proj, **kwargs) - list(self.cover(vpt, XX, r)) - t1 = time.time() - self.logger.info(f"time: {t1 - t0}") - - @profile(n_lines=20) - def test_cover_random(self): - for r in [1.0, 10.0, 100.0]: - for n in [100, 1000, 10000]: - self.logger.info("============ Cover Bench Random ==========") - self.logger.info(f"[n: {n}, r: {r}]") - X = dataset(num=n) - self.logger.info(">>>>>>> HVPT >>>>>>") - self.run_bench(X, r, dist, HVPT, leaf_radius=r, pivoting="random") - self.run_bench(X, r, dist, HVPT, leaf_radius=r, pivoting="furthest") - self.logger.info(">>>>>>> FVPT >>>>>>") - self.run_bench(X, r, dist, FVPT, leaf_radius=r, pivoting="random") - self.run_bench(X, r, dist, FVPT, leaf_radius=r, pivoting="furthest") - self.logger.info(">>>>>> SKBT >>>>>>") - self.run_bench(X, r, dist, SkBallTree) - self.run_bench(X, r, dist, SkBallTree, leaf_radius=r) - self.logger.info("") - - @profile(n_lines=20) - def test_cover_digits(self): - X, _ = load_digits(return_X_y=True) - # X = PCA(n_components=3).fit_transform(X) - for r in [1.0, 10.0, 100.0]: - self.logger.info("======= Cover Bench Digits =======") - self.logger.info(f"[r: {r}]") - self.logger.info(">>>>>>> HVPT >>>>>>") - self.run_bench(X, r, dist, HVPT, leaf_radius=r, pivoting="random") - self.run_bench(X, r, dist, HVPT, leaf_radius=r, pivoting="furthest") - self.logger.info(">>>>>>> FVPT >>>>>>") - self.run_bench(X, r, dist, FVPT, leaf_radius=r, pivoting="random") - self.run_bench(X, r, dist, FVPT, leaf_radius=r, pivoting="furthest") - self.logger.info(">>>>>> SKBT >>>>>>") - self.run_bench(X, r, dist, SkBallTree) - self.run_bench(X, r, dist, SkBallTree, leaf_radius=r) - self.logger.info("") +setup_logging() +logger = logging.getLogger(__name__) + + +def cover(vpt, X, r): + covered_ids = set() + for i, xi in enumerate(X): + if i not in covered_ids: + neigh = vpt.ball_search(xi, r) + neigh_ids = [int(x[0]) for x in neigh] + covered_ids.update(neigh_ids) + if neigh_ids: + yield neigh_ids + + +def run_bench(X, r, dist, vp, **kwargs): + XX = np.array([[i] + [xi for xi in x] for i, x in enumerate(X)]) + t0 = time.time() + vpt = vp(XX, metric=dist_proj, **kwargs) + list(cover(vpt, XX, r)) + t1 = time.time() + logger.info(f"time: {t1 - t0}") + + +@profile(n_lines=20) +def test_cover_random(): + for r in [1.0, 10.0, 100.0]: + for n in [100, 1000, 10000]: + logger.info("============ Cover Bench Random ==========") + logger.info(f"[n: {n}, r: {r}]") + X = dataset(num=n) + logger.info(">>>>>>> HVPT >>>>>>") + run_bench(X, r, dist, HVPT, leaf_radius=r, pivoting="random") + run_bench(X, r, dist, HVPT, leaf_radius=r, pivoting="furthest") + logger.info(">>>>>>> FVPT >>>>>>") + run_bench(X, r, dist, FVPT, leaf_radius=r, pivoting="random") + run_bench(X, r, dist, FVPT, leaf_radius=r, pivoting="furthest") + logger.info(">>>>>> SKBT >>>>>>") + run_bench(X, r, dist, SkBallTree) + run_bench(X, r, dist, SkBallTree, leaf_radius=r) + logger.info("") + + +@profile(n_lines=20) +def test_cover_digits(): + X, _ = load_digits(return_X_y=True) + # X = PCA(n_components=3).fit_transform(X) + for r in [1.0, 10.0, 100.0]: + logger.info("======= Cover Bench Digits =======") + logger.info(f"[r: {r}]") + logger.info(">>>>>>> HVPT >>>>>>") + run_bench(X, r, dist, HVPT, leaf_radius=r, pivoting="random") + run_bench(X, r, dist, HVPT, leaf_radius=r, pivoting="furthest") + logger.info(">>>>>>> FVPT >>>>>>") + run_bench(X, r, dist, FVPT, leaf_radius=r, pivoting="random") + run_bench(X, r, dist, FVPT, leaf_radius=r, pivoting="furthest") + logger.info(">>>>>> SKBT >>>>>>") + run_bench(X, r, dist, SkBallTree) + run_bench(X, r, dist, SkBallTree, leaf_radius=r) + logger.info("") diff --git a/tests/test_bench_metrics.py b/tests/test_bench_metrics.py index f5644e9..4b7f448 100644 --- a/tests/test_bench_metrics.py +++ b/tests/test_bench_metrics.py @@ -1,6 +1,5 @@ import logging import timeit -import unittest import numba import numpy as np @@ -109,14 +108,13 @@ def run_bench(X): return pd.DataFrame(d) -class TestBenchMetrics(unittest.TestCase): +setup_logging() +logger = logging.getLogger(__name__) - setup_logging() - logger = logging.getLogger(__name__) - def test_bench(self): - X = np.random.rand(1000, 1000) - df_bench = run_bench(X) - df_str = str(df_bench) - for line in df_str.split("\n"): - self.logger.info(line) +def test_bench(): + X = np.random.rand(1000, 1000) + df_bench = run_bench(X) + df_str = str(df_bench) + for line in df_str.split("\n"): + logger.info(line) diff --git a/tests/test_bench_vptree.py b/tests/test_bench_vptree.py index df8ff2e..d60e23c 100644 --- a/tests/test_bench_vptree.py +++ b/tests/test_bench_vptree.py @@ -1,10 +1,10 @@ import logging -import unittest from time import time import numpy as np from sklearn.datasets import load_breast_cancer, load_digits, load_iris +from tdamapper._common import profile from tdamapper.utils.metrics import euclidean, get_metric from tdamapper.utils.vptree_flat.vptree import VPTree as FVPT from tdamapper.utils.vptree_hier.vptree import VPTree as HVPT @@ -21,90 +21,96 @@ def dataset(dim=10, num=1000): return [np.random.rand(dim) for _ in range(num)] -class TestBenchmark(unittest.TestCase): - - setup_logging() - logger = logging.getLogger(__name__) - - eps = 0.25 - - k = 5 - - def test_bench(self): - self.logger.info("==== Dataset random =============") - self._test_compare(dataset()) - self.logger.info("==== Dataset iris ===============") - iris, _ = load_iris(as_frame=True, return_X_y=True) - self._test_compare(list(iris.to_numpy())) - self.logger.info("==== Dataset breast_cancer ======") - breast_cancer, _ = load_breast_cancer(as_frame=True, return_X_y=True) - self._test_compare(list(breast_cancer.to_numpy())) - self.logger.info("==== Dataset digits =============") - digits, _ = load_digits(as_frame=True, return_X_y=True) - self._test_compare(list(digits.to_numpy())) - - def _test_compare(self, data): - self.logger.info("[build]") - hvpt = self._test_build(data, " * HVPT ", HVPT) - fvpt = self._test_build(data, " * FVPT ", FVPT) - skbt = self._test_build(data, " * SKBT", SkBallTree) - self.logger.info("[ball search]") - self._test_ball_search_naive(data, " * Naive ") - self._test_ball_search(data, " * HVPT ", hvpt) - self._test_ball_search(data, " * FVPT ", fvpt) - self._test_ball_search(data, " * SKBT", skbt) - self.logger.info("[knn search]") - self._test_knn_search_naive(data, " * Naive ") - self._test_knn_search(data, " * HVPT ", hvpt) - self._test_knn_search(data, " * FVPT ", fvpt) - self._test_knn_search(data, " * SKBT ", skbt) - - def _test_build(self, data, name, builder): - t0 = time() - vpt = builder( - data, - metric=dist, - leaf_radius=self.eps, - leaf_capacity=self.k, - pivoting="furthest", - ) - t1 = time() - self.logger.info(f"{name}: {t1 - t0}") - return vpt - - def _test_ball_search_naive(self, data, name): - d = get_metric(dist) - d(np.array([0.0]), np.array([0.0])) # jit-compile numba - t0 = time() - for val in data: - [x for x in data if d(val, x) <= self.eps] - t1 = time() - self.logger.info(f"{name}: {t1 - t0}") - - def _test_ball_search(self, data, name, vpt): - t0 = time() - for val in data: - vpt.ball_search(val, self.eps) - t1 = time() - self.logger.info(f"{name}: {t1 - t0}") - - def _test_knn_search_naive(self, data, name): - d = get_metric(dist) - d(np.array([0.0]), np.array([0.0])) # jit-compile numba - t0 = time() - for val in data: - - def _dist_key(x): - return d(x, val) - - data.sort(key=_dist_key) - [x for x in data[: self.k]] - t1 = time() - self.logger.info(f"{name}: {t1 - t0}") - - def _test_knn_search(self, data, name, vpt): - t0 = time() - for val in data: - vpt.knn_search(val, self.k) - t1 = time() - self.logger.info(f"{name}: {t1 - t0}") +setup_logging() +logger = logging.getLogger(__name__) + +eps = 0.25 + +k = 5 + + +@profile(n_lines=20) +def test_bench(): + logger.info("==== Dataset random =============") + _test_compare(dataset()) + logger.info("==== Dataset iris ===============") + iris, _ = load_iris(as_frame=True, return_X_y=True) + _test_compare(list(iris.to_numpy())) + logger.info("==== Dataset breast_cancer ======") + breast_cancer, _ = load_breast_cancer(as_frame=True, return_X_y=True) + _test_compare(list(breast_cancer.to_numpy())) + logger.info("==== Dataset digits =============") + digits, _ = load_digits(as_frame=True, return_X_y=True) + _test_compare(list(digits.to_numpy())) + + +def _test_compare(data): + logger.info("[build]") + hvpt = _test_build(data, " * HVPT ", HVPT) + fvpt = _test_build(data, " * FVPT ", FVPT) + skbt = _test_build(data, " * SKBT", SkBallTree) + logger.info("[ball search]") + _test_ball_search_naive(data, " * Naive ") + _test_ball_search(data, " * HVPT ", hvpt) + _test_ball_search(data, " * FVPT ", fvpt) + _test_ball_search(data, " * SKBT", skbt) + logger.info("[knn search]") + _test_knn_search_naive(data, " * Naive ") + _test_knn_search(data, " * HVPT ", hvpt) + _test_knn_search(data, " * FVPT ", fvpt) + _test_knn_search(data, " * SKBT ", skbt) + + +def _test_build(data, name, builder): + t0 = time() + vpt = builder( + data, + metric=dist, + leaf_radius=eps, + leaf_capacity=k, + pivoting="furthest", + ) + t1 = time() + logger.info(f"{name}: {t1 - t0}") + return vpt + + +def _test_ball_search_naive(data, name): + d = get_metric(dist) + d(np.array([0.0]), np.array([0.0])) # jit-compile numba + t0 = time() + for val in data: + [x for x in data if d(val, x) <= eps] + t1 = time() + logger.info(f"{name}: {t1 - t0}") + + +def _test_ball_search(data, name, vpt): + t0 = time() + for val in data: + vpt.ball_search(val, eps) + t1 = time() + logger.info(f"{name}: {t1 - t0}") + + +def _test_knn_search_naive(data, name): + d = get_metric(dist) + d(np.array([0.0]), np.array([0.0])) # jit-compile numba + t0 = time() + for val in data: + + def _dist_key(x): + return d(x, val) + + data.sort(key=_dist_key) + [x for x in data[:k]] + t1 = time() + logger.info(f"{name}: {t1 - t0}") + + +def _test_knn_search(data, name, vpt): + t0 = time() + for val in data: + vpt.knn_search(val, k) + t1 = time() + logger.info(f"{name}: {t1 - t0}") diff --git a/tests/test_unit_core.py b/tests/test_unit_core.py index 5cb6843..5a70b4e 100644 --- a/tests/test_unit_core.py +++ b/tests/test_unit_core.py @@ -1,7 +1,6 @@ -import unittest - import networkx as nx import numpy as np +import pytest from sklearn.cluster import DBSCAN, AgglomerativeClustering from sklearn.datasets import load_digits from sklearn.decomposition import PCA @@ -27,204 +26,215 @@ def dataset(dim=10, num=1000): return [np.random.rand(dim) for _ in range(num)] -class TestMapper(unittest.TestCase): - - def test_trivial(self): - data = dataset() - mp = MapperAlgorithm(TrivialCover(), TrivialClustering()) - g = mp.fit_transform(data, data) - self.assertEqual(1, len(g)) - self.assertEqual([], list(g.neighbors(0))) - ccs = list(nx.connected_components(g)) - self.assertEqual(1, len(ccs)) - ccs2 = mapper_connected_components( - data, - data, - TrivialCover(), - TrivialClustering(), - ) - self.assertEqual(len(data), len(ccs2)) - - def test_ball_small_radius(self): - data = np.array([[float(i)] for i in range(1000)]) - cover = BallCover(0.5, metric=dist) - clustering = TrivialClustering() - mp = MapperAlgorithm(cover, clustering) - g = mp.fit_transform(data, data) - self.assertEqual(1000, len(g)) - for node in g.nodes(): - self.assertEqual([], list(g.neighbors(node))) - ccs = list(nx.connected_components(g)) - self.assertEqual(1000, len(ccs)) - ccs2 = mapper_connected_components(data, data, cover, clustering) - self.assertEqual(len(data), len(ccs2)) - - def test_ball_small_radius_list(self): - data = [np.array([float(i)]) for i in range(1000)] - cover = BallCover(0.5, metric=dist) - clustering = DBSCAN(eps=1.0, min_samples=1) - mp = MapperAlgorithm(cover=cover, clustering=clustering) - g = mp.fit_transform(data, data) - self.assertEqual(1000, len(g)) - for node in g.nodes(): - self.assertEqual([], list(g.neighbors(node))) - ccs = list(nx.connected_components(g)) - self.assertEqual(1000, len(ccs)) - ccs2 = mapper_connected_components(data, data, cover, clustering) - self.assertEqual(len(data), len(ccs2)) - - def test_ball_large_radius(self): - data = np.array([[float(i)] for i in range(1000)]) - cover = BallCover(1000.0, metric=dist) - clustering = TrivialClustering() - mp = MapperAlgorithm(cover=cover, clustering=clustering) - g = mp.fit_transform(data, data) - self.assertEqual(1, len(g)) - for node in g.nodes(): - self.assertEqual([], list(g.neighbors(node))) - ccs = list(nx.connected_components(g)) - self.assertEqual(1, len(ccs)) - ccs2 = mapper_connected_components(data, data, cover, clustering) - self.assertEqual(len(data), len(ccs2)) - - def test_ball_two_disconnected_clusters(self): - data = [np.array([float(i), 0.0]) for i in range(100)] - data.extend([np.array([float(i), 500.0]) for i in range(100)]) - data = np.array(data) - cover = BallCover(150.0, metric=dist) - clustering = TrivialClustering() - mp = MapperAlgorithm(cover=cover, clustering=clustering) - g = mp.fit_transform(data, data) - self.assertEqual(2, len(g)) - for node in g.nodes(): - self.assertEqual([], list(g.neighbors(node))) - ccs = list(nx.connected_components(g)) - self.assertEqual(2, len(ccs)) - ccs2 = mapper_connected_components(data, data, cover, clustering) - self.assertEqual(len(data), len(ccs2)) - - def test_ball_two_connected_clusters(self): - data = [ - np.array([0.0, 1.0]), - np.array([1.0, 0.0]), - np.array([0.0, 0.0]), - np.array([1.0, 1.0]), - ] - cover = BallCover(1.1, metric=dist) - clustering = TrivialClustering() - mp = MapperAlgorithm(cover=cover, clustering=clustering) - g = mp.fit_transform(data, data) - self.assertEqual(2, len(g)) - for node in g.nodes(): - self.assertEqual(1, len(list(g.neighbors(node)))) - ccs = list(nx.connected_components(g)) - self.assertEqual(1, len(ccs)) - ccs2 = mapper_connected_components(data, data, cover, clustering) - self.assertEqual(len(data), len(ccs2)) - - def test_ball_two_connected_clusters_parallel(self): - data = [ - np.array([0.0, 1.0]), - np.array([1.0, 0.0]), - np.array([0.0, 0.0]), - np.array([1.0, 1.0]), - ] - cover = BallCover(1.1, metric=dist) - clustering = TrivialClustering() - mp = MapperAlgorithm( - cover=cover, - clustering=clustering, - n_jobs=4, - ) - g = mp.fit_transform(data, data) - self.assertEqual(2, len(g)) - for node in g.nodes(): - self.assertEqual(1, len(list(g.neighbors(node)))) - ccs = list(nx.connected_components(g)) - self.assertEqual(1, len(ccs)) - ccs2 = mapper_connected_components(data, data, cover, clustering) - self.assertEqual(len(data), len(ccs2)) - - def test_proximity_cubical_line(self): - data = np.array([[float(i)] for i in range(1000)]) - cover = ProximityCubicalCover(n_intervals=4, overlap_frac=0.5) - clustering = TrivialClustering() - mp = MapperAlgorithm(cover, clustering) - g = mp.fit_transform(data, data) - self.assertEqual(4, len(g.nodes)) - - def test_standard_cubical_line(self): - data = np.array([[float(i)] for i in range(1000)]) - cover = StandardCubicalCover(n_intervals=4, overlap_frac=0.5) - clustering = TrivialClustering() - mp = MapperAlgorithm(cover, clustering) - g = mp.fit_transform(data, data) - self.assertEqual(4, len(g.nodes)) - - def test_cubical_line(self): - data = np.array([[float(i)] for i in range(1000)]) - cover = CubicalCover(n_intervals=4, overlap_frac=0.5) - clustering = TrivialClustering() - mp = MapperAlgorithm(cover, clustering) - g = mp.fit_transform(data, data) - self.assertEqual(4, len(g.nodes)) - - def test_cubical_no_overlap(self): - data = np.array([[0.0], [1.0], [2.0]]) - cover = StandardCubicalCover(n_intervals=2, overlap_frac=0) - clustering = TrivialClustering() - mp = MapperAlgorithm(cover, clustering) - with self.assertRaises(ValueError): - mp.fit_transform(data, data) - - def test_mock_connected_components(self): - data = [0, 1, 2, 3] - - class MockCover: - - def apply(self, X): - yield [0, 3] - yield [1, 3] - yield [1, 2] - yield [0, 1, 3] - - cover = MockCover() - clustering = TrivialClustering() - ccs = mapper_connected_components(data, data, cover, clustering) - self.assertEqual(len(data), len(ccs)) - cc0 = ccs[0] - self.assertEqual(cc0, ccs[1]) - self.assertEqual(cc0, ccs[2]) - self.assertEqual(cc0, ccs[3]) - - def test_mock_labels(self): - data = [0, 1, 2, 3] - - class MockCover: - - def apply(self, X): - yield [0, 3] - yield [1, 3] - yield [1, 2] - yield [0, 1, 3] - - cover = MockCover() - clustering = TrivialClustering() - labels = mapper_labels(data, data, cover, clustering) - self.assertEqual(len(data), len(labels)) - self.assertEqual([0, 3], labels[0]) - self.assertEqual([1, 2, 3], labels[1]) - self.assertEqual([2], labels[2]) - self.assertEqual([0, 1, 3], labels[3]) - - def test_full(self): - X, _ = load_digits(return_X_y=True) - y = PCA(2, random_state=42).fit_transform(X) - mapper = MapperAlgorithm( - cover=CubicalCover(n_intervals=10, overlap_frac=0.5), - clustering=AgglomerativeClustering(10), - verbose=False, - ) - graph = mapper.fit_transform(X, y) - self.assertEqual(381, len(graph.nodes())) - self.assertEqual(736, len(graph.edges())) +def test_trivial(): + data = dataset() + mp = MapperAlgorithm(TrivialCover(), TrivialClustering()) + g = mp.fit_transform(data, data) + assert 1 == len(g) + assert [] == list(g.neighbors(0)) + ccs = list(nx.connected_components(g)) + assert 1 == len(ccs) + ccs2 = mapper_connected_components( + data, + data, + TrivialCover(), + TrivialClustering(), + ) + assert len(data) == len(ccs2) + + +def test_ball_small_radius(): + data = np.array([[float(i)] for i in range(1000)]) + cover = BallCover(0.5, metric=dist) + clustering = TrivialClustering() + mp = MapperAlgorithm(cover, clustering) + g = mp.fit_transform(data, data) + assert 1000 == len(g) + for node in g.nodes(): + assert [] == list(g.neighbors(node)) + ccs = list(nx.connected_components(g)) + assert 1000 == len(ccs) + ccs2 = mapper_connected_components(data, data, cover, clustering) + assert len(data) == len(ccs2) + + +def test_ball_small_radius_list(): + data = [np.array([float(i)]) for i in range(1000)] + cover = BallCover(0.5, metric=dist) + clustering = DBSCAN(eps=1.0, min_samples=1) + mp = MapperAlgorithm(cover=cover, clustering=clustering) + g = mp.fit_transform(data, data) + assert 1000 == len(g) + for node in g.nodes(): + assert [] == list(g.neighbors(node)) + ccs = list(nx.connected_components(g)) + assert 1000 == len(ccs) + ccs2 = mapper_connected_components(data, data, cover, clustering) + assert len(data) == len(ccs2) + + +def test_ball_large_radius(): + data = np.array([[float(i)] for i in range(1000)]) + cover = BallCover(1000.0, metric=dist) + clustering = TrivialClustering() + mp = MapperAlgorithm(cover=cover, clustering=clustering) + g = mp.fit_transform(data, data) + assert 1 == len(g) + for node in g.nodes(): + assert [] == list(g.neighbors(node)) + ccs = list(nx.connected_components(g)) + assert 1 == len(ccs) + ccs2 = mapper_connected_components(data, data, cover, clustering) + assert len(data) == len(ccs2) + + +def test_ball_two_disconnected_clusters(): + data = [np.array([float(i), 0.0]) for i in range(100)] + data.extend([np.array([float(i), 500.0]) for i in range(100)]) + data = np.array(data) + cover = BallCover(150.0, metric=dist) + clustering = TrivialClustering() + mp = MapperAlgorithm(cover=cover, clustering=clustering) + g = mp.fit_transform(data, data) + assert 2 == len(g) + for node in g.nodes(): + assert [] == list(g.neighbors(node)) + ccs = list(nx.connected_components(g)) + assert 2 == len(ccs) + ccs2 = mapper_connected_components(data, data, cover, clustering) + assert len(data) == len(ccs2) + + +def test_ball_two_connected_clusters(): + data = [ + np.array([0.0, 1.0]), + np.array([1.0, 0.0]), + np.array([0.0, 0.0]), + np.array([1.0, 1.0]), + ] + cover = BallCover(1.1, metric=dist) + clustering = TrivialClustering() + mp = MapperAlgorithm(cover=cover, clustering=clustering) + g = mp.fit_transform(data, data) + assert 2 == len(g) + for node in g.nodes(): + assert 1 == len(list(g.neighbors(node))) + ccs = list(nx.connected_components(g)) + assert 1 == len(ccs) + ccs2 = mapper_connected_components(data, data, cover, clustering) + assert len(data) == len(ccs2) + + +def test_ball_two_connected_clusters_parallel(): + data = [ + np.array([0.0, 1.0]), + np.array([1.0, 0.0]), + np.array([0.0, 0.0]), + np.array([1.0, 1.0]), + ] + cover = BallCover(1.1, metric=dist) + clustering = TrivialClustering() + mp = MapperAlgorithm( + cover=cover, + clustering=clustering, + n_jobs=4, + ) + g = mp.fit_transform(data, data) + assert 2 == len(g) + for node in g.nodes(): + assert 1 == len(list(g.neighbors(node))) + ccs = list(nx.connected_components(g)) + assert 1 == len(ccs) + ccs2 = mapper_connected_components(data, data, cover, clustering) + assert len(data) == len(ccs2) + + +def test_proximity_cubical_line(): + data = np.array([[float(i)] for i in range(1000)]) + cover = ProximityCubicalCover(n_intervals=4, overlap_frac=0.5) + clustering = TrivialClustering() + mp = MapperAlgorithm(cover, clustering) + g = mp.fit_transform(data, data) + assert 4 == len(g.nodes) + + +def test_standard_cubical_line(): + data = np.array([[float(i)] for i in range(1000)]) + cover = StandardCubicalCover(n_intervals=4, overlap_frac=0.5) + clustering = TrivialClustering() + mp = MapperAlgorithm(cover, clustering) + g = mp.fit_transform(data, data) + assert 4 == len(g.nodes) + + +def test_cubical_line(): + data = np.array([[float(i)] for i in range(1000)]) + cover = CubicalCover(n_intervals=4, overlap_frac=0.5) + clustering = TrivialClustering() + mp = MapperAlgorithm(cover, clustering) + g = mp.fit_transform(data, data) + assert 4 == len(g.nodes) + + +def test_cubical_no_overlap(): + data = np.array([[0.0], [1.0], [2.0]]) + cover = StandardCubicalCover(n_intervals=2, overlap_frac=0) + clustering = TrivialClustering() + mp = MapperAlgorithm(cover, clustering) + with pytest.raises(ValueError): + mp.fit_transform(data, data) + + +def test_mock_connected_components(): + data = [0, 1, 2, 3] + + class MockCover: + + def apply(self, X): + yield [0, 3] + yield [1, 3] + yield [1, 2] + yield [0, 1, 3] + + cover = MockCover() + clustering = TrivialClustering() + ccs = mapper_connected_components(data, data, cover, clustering) + assert len(data) == len(ccs) + cc0 = ccs[0] + assert cc0 == ccs[1] + assert cc0 == ccs[2] + assert cc0 == ccs[3] + + +def test_mock_labels(): + data = [0, 1, 2, 3] + + class MockCover: + + def apply(self, X): + yield [0, 3] + yield [1, 3] + yield [1, 2] + yield [0, 1, 3] + + cover = MockCover() + clustering = TrivialClustering() + labels = mapper_labels(data, data, cover, clustering) + assert len(data) == len(labels) + assert [0, 3] == labels[0] + assert [1, 2, 3] == labels[1] + assert [2] == labels[2] + assert [0, 1, 3] == labels[3] + + +def test_full(): + X, _ = load_digits(return_X_y=True) + y = PCA(2, random_state=42).fit_transform(X) + mapper = MapperAlgorithm( + cover=CubicalCover(n_intervals=10, overlap_frac=0.5), + clustering=AgglomerativeClustering(10), + verbose=False, + ) + graph = mapper.fit_transform(X, y) + assert 381 == len(graph.nodes()) + assert 736 == len(graph.edges()) diff --git a/tests/test_unit_cover.py b/tests/test_unit_cover.py index eace8a7..b9c5add 100644 --- a/tests/test_unit_cover.py +++ b/tests/test_unit_cover.py @@ -1,5 +1,3 @@ -import unittest - import numpy as np from tdamapper.core import TrivialCover @@ -10,64 +8,67 @@ def dataset(dim=1, num=10000): return [np.random.rand(dim) for _ in range(num)] -class TestCover(unittest.TestCase): +def test_trivial_cover(): + data = dataset() + cover = TrivialCover() + charts = list(cover.apply(data)) + assert 1 == len(charts) + + +def test_ball_cover_simple(): + data = [ + np.array([0.0, 1.0]), + np.array([1.0, 0.0]), + np.array([0.0, 0.0]), + np.array([1.0, 1.0]), + ] + cover = BallCover(radius=1.1, metric="euclidean") + charts = list(cover.apply(data)) + assert 2 == len(charts) + + +def test_knn_cover_simple(): + data = [ + np.array([0.0, 1.0]), + np.array([1.1, 0.0]), + np.array([0.0, 0.0]), + np.array([1.1, 1.0]), + ] + cover = KNNCover(neighbors=2, metric="euclidean") + charts = list(cover.apply(data)) + assert 2 == len(charts) - def test_trivial_cover(self): - data = dataset() - cover = TrivialCover() - charts = list(cover.apply(data)) - self.assertEqual(1, len(charts)) - def test_ball_cover_simple(self): - data = [ - np.array([0.0, 1.0]), - np.array([1.0, 0.0]), - np.array([0.0, 0.0]), - np.array([1.0, 1.0]), - ] - cover = BallCover(radius=1.1, metric="euclidean") - charts = list(cover.apply(data)) - self.assertEqual(2, len(charts)) +def test_cubical_cover_simple(): + data = [ + np.array([0.0, 1.0]), + np.array([1.1, 0.0]), + np.array([0.0, 0.0]), + np.array([1.1, 1.0]), + ] + cover = CubicalCover(n_intervals=2, overlap_frac=0.5) + charts = list(cover.apply(data)) + assert 4 == len(charts) - def test_knn_cover_simple(self): - data = [ - np.array([0.0, 1.0]), - np.array([1.1, 0.0]), - np.array([0.0, 0.0]), - np.array([1.1, 1.0]), - ] - cover = KNNCover(neighbors=2, metric="euclidean") - charts = list(cover.apply(data)) - self.assertEqual(2, len(charts)) - def test_cubical_cover_simple(self): - data = [ - np.array([0.0, 1.0]), - np.array([1.1, 0.0]), - np.array([0.0, 0.0]), - np.array([1.1, 1.0]), - ] - cover = CubicalCover(n_intervals=2, overlap_frac=0.5) - charts = list(cover.apply(data)) - self.assertEqual(4, len(charts)) +def test_params(): + cover = CubicalCover(n_intervals=2, overlap_frac=0.5) + params = cover.get_params(deep=True) + assert 2 == params["n_intervals"] + assert 0.5 == params["overlap_frac"] - def test_params(self): - cover = CubicalCover(n_intervals=2, overlap_frac=0.5) - params = cover.get_params(deep=True) - self.assertEqual(2, params["n_intervals"]) - self.assertEqual(0.5, params["overlap_frac"]) - def test_standard_cover_simple(self): - data = [ - np.array([0.0, 1.0]), - np.array([1.1, 0.0]), - np.array([0.0, 0.0]), - np.array([1.1, 1.0]), - ] - cover = CubicalCover( - n_intervals=2, - overlap_frac=0.5, - algorithm="standard", - ) - charts = list(cover.apply(data)) - self.assertEqual(4, len(charts)) +def test_standard_cover_simple(): + data = [ + np.array([0.0, 1.0]), + np.array([1.1, 0.0]), + np.array([0.0, 0.0]), + np.array([1.1, 1.0]), + ] + cover = CubicalCover( + n_intervals=2, + overlap_frac=0.5, + algorithm="standard", + ) + charts = list(cover.apply(data)) + assert 4 == len(charts) diff --git a/tests/test_unit_heap.py b/tests/test_unit_heap.py index 0ffcedf..55f0ce3 100644 --- a/tests/test_unit_heap.py +++ b/tests/test_unit_heap.py @@ -1,5 +1,4 @@ import random -import unittest from tdamapper.utils.heap import MaxHeap @@ -11,30 +10,30 @@ def maxheap(data): return m -class TestMaxHeap(unittest.TestCase): - - def test_empty(self): - m = MaxHeap() - self.assertEqual(0, len(m)) - - def test_max(self): - data = list(range(10)) - random.shuffle(data) - m = maxheap(data) - self.assertEqual((9, 9), m.top()) - self.assertEqual(10, len(m)) - - def test_max_random(self): - data = random.sample(list(range(1000)), 100) - m = maxheap(data) - self.assertEqual(100, len(m)) - max_data = max(data) - self.assertEqual((max_data, max_data), m.top()) - self.assertNotEqual(0, len(m)) - collected = [] - for _ in range(10): - collected.append(m.pop()) - data.sort() - collected.sort() - self.assertEqual(collected, [(x, x) for x in data[-10:]]) - self.assertEqual(90, len(m)) +def test_empty(): + m = MaxHeap() + assert 0 == len(m) + + +def test_max(): + data = list(range(10)) + random.shuffle(data) + m = maxheap(data) + assert (9, 9) == m.top() + assert 10 == len(m) + + +def test_max_random(): + data = random.sample(list(range(1000)), 100) + m = maxheap(data) + assert 100 == len(m) + max_data = max(data) + assert (max_data, max_data) == m.top() + assert 0 != len(m) + collected = [] + for _ in range(10): + collected.append(m.pop()) + data.sort() + collected.sort() + assert collected == [(x, x) for x in data[-10:]] + assert 90 == len(m) diff --git a/tests/test_unit_knn.py b/tests/test_unit_knn.py index a38e1c9..09c3035 100644 --- a/tests/test_unit_knn.py +++ b/tests/test_unit_knn.py @@ -1,5 +1,3 @@ -import unittest - import numpy as np from tdamapper.cover import KNNCover @@ -95,67 +93,68 @@ x = np.array([99.73199663, 100.8024564]) -class TestKNN(unittest.TestCase): - - def test_knn_search(self): - knn_cover = KNNCover(neighbors=5, metric="euclidean") - knn_cover.fit(X) - neigh_ids = knn_cover.search(x) - d = euclidean() - dists = [d(x, X[j]) for j in neigh_ids] - x_dist = d(x, X[5]) - self.assertTrue(x_dist in dists) - - def test_vptree(self): - vptree = VPTree(X[:80], metric="euclidean", leaf_capacity=5) - neigh = vptree.knn_search(x, 5) - d = euclidean() - dists = [d(x, y) for y in neigh] - x_dist = d(x, X[5]) - self.check_vptree(vptree) - self.assertTrue(x_dist in dists) - - def test_vptree_simple(self): - XX = np.array([np.array([x, x / 2]) for x in range(30)]) - vptree = VPTree(XX, metric="euclidean", leaf_capacity=5, leaf_radius=0.0) - xx = np.array([3, 3 / 2]) - neigh = vptree.knn_search(xx, 2) - d = euclidean() - dists = [d(xx, y) for y in neigh] - self.check_vptree(vptree) - self.assertTrue(0.0 in dists) - - def check_vptree(self, vpt): - arr = vpt._get_arr() - data = arr._dataset - distances = arr._distances - indices = arr._indices - - dist = vpt._get_distance() - leaf_capacity = vpt.get_leaf_capacity() - leaf_radius = vpt.get_leaf_radius() - - def check_sub(start, end): - v_radius = distances[start] - v_point_index = indices[start] - v_point = data[v_point_index] - +def test_knn_search(): + knn_cover = KNNCover(neighbors=5, metric="euclidean") + knn_cover.fit(X) + neigh_ids = knn_cover.search(x) + d = euclidean() + dists = [d(x, X[j]) for j in neigh_ids] + x_dist = d(x, X[5]) + assert x_dist in dists + + +def test_vptree(): + vptree = VPTree(X[:80], metric="euclidean", leaf_capacity=5) + neigh = vptree.knn_search(x, 5) + d = euclidean() + dists = [d(x, y) for y in neigh] + x_dist = d(x, X[5]) + check_vptree(vptree) + assert x_dist in dists + + +def test_vptree_simple(): + XX = np.array([np.array([x, x / 2]) for x in range(30)]) + vptree = VPTree(XX, metric="euclidean", leaf_capacity=5, leaf_radius=0.0) + xx = np.array([3, 3 / 2]) + neigh = vptree.knn_search(xx, 2) + d = euclidean() + dists = [d(xx, y) for y in neigh] + check_vptree(vptree) + assert 0.0 in dists + + +def check_vptree(vpt): + arr = vpt._get_arr() + data = arr._dataset + distances = arr._distances + indices = arr._indices + + dist = vpt._get_distance() + leaf_capacity = vpt.get_leaf_capacity() + leaf_radius = vpt.get_leaf_radius() + + def check_sub(start, end): + v_radius = distances[start] + v_point_index = indices[start] + v_point = data[v_point_index] + + mid = (start + end) // 2 + for i in range(start + 1, mid): + y_index = indices[i] + y = data[y_index] + assert dist(v_point, y) <= v_radius + for i in range(mid, end): + y_index = indices[i] + y = data[y_index] + assert dist(v_point, y) >= v_radius + + def check_rec(start, end): + v_radius = distances[start] + if (end - start > leaf_capacity) and (v_radius > leaf_radius): + check_sub(start, end) mid = (start + end) // 2 - for i in range(start + 1, mid): - y_index = indices[i] - y = data[y_index] - self.assertTrue(dist(v_point, y) <= v_radius) - for i in range(mid, end): - y_index = indices[i] - y = data[y_index] - self.assertTrue(dist(v_point, y) >= v_radius) - - def check_rec(start, end): - v_radius = distances[start] - if (end - start > leaf_capacity) and (v_radius > leaf_radius): - check_sub(start, end) - mid = (start + end) // 2 - check_rec(start + 1, mid) - check_rec(mid, end) - - check_rec(0, len(data)) + check_rec(start + 1, mid) + check_rec(mid, end) + + check_rec(0, len(data)) diff --git a/tests/test_unit_learn.py b/tests/test_unit_learn.py index de9fc93..7072c33 100644 --- a/tests/test_unit_learn.py +++ b/tests/test_unit_learn.py @@ -1,5 +1,3 @@ -import unittest - import networkx as nx import numpy as np from sklearn.utils.estimator_checks import check_estimator @@ -17,29 +15,31 @@ def dataset(dim=10, num=1000): return [np.random.rand(dim) for _ in range(num)] -class TestMapper(unittest.TestCase): +def run_tests(estimator): + for est, check in check_estimator(estimator, generate_only=True): + check(est) + + +def test_mapper_learn(): + data = dataset() + mp = MapperAlgorithm(TrivialCover(), TrivialClustering()) + g = mp.fit_transform(data, data) + assert 1 == len(g) + assert [] == list(g.neighbors(0)) + ccs = list(nx.connected_components(g)) + assert 1 == len(ccs) + - def run_tests(self, estimator): - for est, check in check_estimator(estimator, generate_only=True): - check(est) +def test_mapper_learn_est(): + est = MapperAlgorithm() + run_tests(est) - def test_mapper_learn(self): - data = dataset() - mp = MapperAlgorithm(TrivialCover(), TrivialClustering()) - g = mp.fit_transform(data, data) - self.assertEqual(1, len(g)) - self.assertEqual([], list(g.neighbors(0))) - ccs = list(nx.connected_components(g)) - self.assertEqual(1, len(ccs)) - def test_mapper_learn_est(self): - est = MapperAlgorithm() - self.run_tests(est) +def test_mapper_clustering_trivial(): + est = MapperClustering() + run_tests(est) - def test_mapper_clustering_trivial(self): - est = MapperClustering() - self.run_tests(est) - def test_mapper_clustering_ball(self): - est = MapperClustering(cover=BallCover(metric=euclidean)) - self.run_tests(est) +def test_mapper_clustering_ball(): + est = MapperClustering(cover=BallCover(metric=euclidean)) + run_tests(est) diff --git a/tests/test_unit_metrics.py b/tests/test_unit_metrics.py index 27072a8..ca2c192 100644 --- a/tests/test_unit_metrics.py +++ b/tests/test_unit_metrics.py @@ -1,53 +1,51 @@ -import unittest - import numpy as np import tdamapper.utils.metrics as metrics -class TestMetrics(unittest.TestCase): - - def test_euclidean(self): - d = metrics.euclidean() - a = np.array([1.0, 0.0]) - b = np.array([0.0, 1.0]) - ab = d(a, b) - self.assertGreaterEqual(ab, 1.414) - self.assertLessEqual(ab, 1.415) - - def test_manhattan(self): - d = metrics.manhattan() - a = np.array([1.0, 0.0]) - b = np.array([0.0, 1.0]) - ab = d(a, b) - self.assertEqual(ab, 2.0) - - def test_chebyshev(self): - d = metrics.chebyshev() - a = np.array([1.0, 0.0]) - b = np.array([0.0, 1.0]) - ab = d(a, b) - self.assertEqual(ab, 1.0) - - def test_cosine(self): - d = metrics.cosine() - a = np.array([1.0, 0.0]) - b = np.array([0.0, 1.0]) - c = np.array([0.0, 2.0]) - ab = d(a, b) - self.assertGreaterEqual(ab, 1.414) - self.assertLessEqual(ab, 1.415) - bc = d(b, c) - self.assertEqual(bc, 0.0) - - def test_get_metric(self): - self.assertEqual(metrics.euclidean(), metrics.get_metric("euclidean")) - self.assertEqual(metrics.euclidean(), metrics.get_metric("minkowski")) - self.assertEqual(metrics.chebyshev(), metrics.get_metric("chebyshev")) - self.assertEqual(metrics.chebyshev(), metrics.get_metric("minkowski", p=np.inf)) - self.assertEqual( - metrics.chebyshev(), metrics.get_metric("minkowski", p=float("inf")) - ) - self.assertEqual(metrics.manhattan(), metrics.get_metric("manhattan")) - self.assertEqual(metrics.manhattan(), metrics.get_metric("minkowski", p=1)) - self.assertEqual(metrics.cosine(), metrics.get_metric("cosine")) +def test_euclidean(): + d = metrics.euclidean() + a = np.array([1.0, 0.0]) + b = np.array([0.0, 1.0]) + ab = d(a, b) + assert ab >= 1.414 + assert ab <= 1.415 + + +def test_manhattan(): + d = metrics.manhattan() + a = np.array([1.0, 0.0]) + b = np.array([0.0, 1.0]) + ab = d(a, b) + assert ab == 2.0 + + +def test_chebyshev(): + d = metrics.chebyshev() + a = np.array([1.0, 0.0]) + b = np.array([0.0, 1.0]) + ab = d(a, b) + assert ab == 1.0 + + +def test_cosine(): + d = metrics.cosine() + a = np.array([1.0, 0.0]) + b = np.array([0.0, 1.0]) + c = np.array([0.0, 2.0]) + ab = d(a, b) + assert ab >= 1.414 + assert ab <= 1.415 + bc = d(b, c) + assert bc == 0.0 + + +def test_get_metric(): + assert metrics.euclidean() == metrics.get_metric("euclidean") + assert metrics.euclidean() == metrics.get_metric("minkowski") + assert metrics.chebyshev() == metrics.get_metric("chebyshev") + assert metrics.chebyshev() == metrics.get_metric("minkowski", p=np.inf) + assert metrics.chebyshev() == metrics.get_metric("minkowski", p=float("inf")) + assert metrics.manhattan() == metrics.get_metric("manhattan") + assert metrics.manhattan() == metrics.get_metric("minkowski", p=1) + assert metrics.cosine() == metrics.get_metric("cosine") diff --git a/tests/test_unit_params.py b/tests/test_unit_params.py index be7255b..fd594e3 100644 --- a/tests/test_unit_params.py +++ b/tests/test_unit_params.py @@ -1,5 +1,3 @@ -import unittest - from sklearn.cluster import DBSCAN from tdamapper._common import clone @@ -7,120 +5,126 @@ from tdamapper.learn import MapperAlgorithm, MapperClustering -class TestParams(unittest.TestCase): +def __test_clone(obj): + obj_repr = repr(obj) + obj_cln = clone(obj) + cln_repr = repr(obj_cln) + assert obj_repr == cln_repr + + +def __test_repr(obj): + obj_repr = repr(obj) + _obj = eval(obj_repr) + _obj_repr = repr(_obj) + assert obj_repr == _obj_repr + + +def __test_clone_and_repr(obj): + __test_clone(obj) + __test_repr(obj) + + +def test_params_mapper_algorithm(): + est = MapperAlgorithm( + cover=CubicalCover( + n_intervals=3, + overlap_frac=0.3, + ), + ) + params = est.get_params(deep=False) + assert 5 == len(params) + params = est.get_params() + assert 12 == len(params) + assert 3 == params["cover__n_intervals"] + assert 0.3 == params["cover__overlap_frac"] + est.set_params(cover__n_intervals=2, cover__overlap_frac=0.2) + params = est.get_params() + assert 12 == len(params) + assert 2 == params["cover__n_intervals"] + assert 0.2 == params["cover__overlap_frac"] + + +def test_params_mapper_clustering(): + est = MapperClustering( + cover=CubicalCover( + n_intervals=3, + overlap_frac=0.3, + ), + ) + params = est.get_params(deep=False) + assert 3 == len(params) + params = est.get_params() + assert 10 == len(params) + assert 3 == params["cover__n_intervals"] + assert 0.3 == params["cover__overlap_frac"] + est.set_params(cover__n_intervals=2, cover__overlap_frac=0.2) + params = est.get_params() + assert 10 == len(params) + assert 2 == params["cover__n_intervals"] + assert 0.2 == params["cover__overlap_frac"] + + +def test_clone_and_repr_ball_cover(): + __test_clone_and_repr(BallCover()) + __test_clone_and_repr( + BallCover( + radius=2.0, + metric="test", + metric_params={"f": 4}, + kind="kind_test", + leaf_capacity=3.0, + leaf_radius=-2.0, + pivoting=7, + ) + ) - def __test_clone(self, obj): - obj_repr = repr(obj) - obj_cln = clone(obj) - cln_repr = repr(obj_cln) - self.assertEqual(obj_repr, cln_repr) - def __test_repr(self, obj): - obj_repr = repr(obj) - _obj = eval(obj_repr) - _obj_repr = repr(_obj) - self.assertEqual(obj_repr, _obj_repr) +def test_clone_and_repr_cubical_cover(): + __test_clone_and_repr(CubicalCover()) + __test_clone_and_repr( + CubicalCover( + n_intervals=4, + overlap_frac=5, + algorithm="algo_test", + kind="simple", + leaf_radius=5, + leaf_capacity=6, + pivoting="no", + ) + ) - def __test_clone_and_repr(self, obj): - self.__test_clone(obj) - self.__test_repr(obj) - def test_params_mapper_algorithm(self): - est = MapperAlgorithm( +def test_clone_repr_mapper_algorithm(): + __test_clone_and_repr(MapperAlgorithm()) + __test_clone_and_repr( + MapperAlgorithm( cover=CubicalCover( n_intervals=3, overlap_frac=0.3, ), + clustering=DBSCAN( + eps="none", + min_samples=5.4, + ), + failsafe=4, + n_jobs="foo", + verbose=4, ) - params = est.get_params(deep=False) - self.assertEqual(5, len(params)) - params = est.get_params() - self.assertEqual(12, len(params)) - self.assertEqual(3, params["cover__n_intervals"]) - self.assertEqual(0.3, params["cover__overlap_frac"]) - est.set_params(cover__n_intervals=2, cover__overlap_frac=0.2) - params = est.get_params() - self.assertEqual(12, len(params)) - self.assertEqual(2, params["cover__n_intervals"]) - self.assertEqual(0.2, params["cover__overlap_frac"]) - - def test_params_mapper_clustering(self): - est = MapperClustering( + ) + + +def test_clone_repr_mapper_clustering(): + __test_clone_and_repr(MapperClustering()) + __test_clone_and_repr( + MapperClustering( cover=CubicalCover( n_intervals=3, overlap_frac=0.3, ), + clustering=DBSCAN( + eps="none", + min_samples=5.4, + ), + n_jobs="foo", ) - params = est.get_params(deep=False) - self.assertEqual(3, len(params)) - params = est.get_params() - self.assertEqual(10, len(params)) - self.assertEqual(3, params["cover__n_intervals"]) - self.assertEqual(0.3, params["cover__overlap_frac"]) - est.set_params(cover__n_intervals=2, cover__overlap_frac=0.2) - params = est.get_params() - self.assertEqual(10, len(params)) - self.assertEqual(2, params["cover__n_intervals"]) - self.assertEqual(0.2, params["cover__overlap_frac"]) - - def test_clone_and_repr_ball_cover(self): - self.__test_clone_and_repr(BallCover()) - self.__test_clone_and_repr( - BallCover( - radius=2.0, - metric="test", - metric_params={"f": 4}, - kind="kind_test", - leaf_capacity=3.0, - leaf_radius=-2.0, - pivoting=7, - ) - ) - - def test_clone_and_repr_cubical_cover(self): - self.__test_clone_and_repr(CubicalCover()) - self.__test_clone_and_repr( - CubicalCover( - n_intervals=4, - overlap_frac=5, - algorithm="algo_test", - kind="simple", - leaf_radius=5, - leaf_capacity=6, - pivoting="no", - ) - ) - - def test_clone_repr_mapper_algorithm(self): - self.__test_clone_and_repr(MapperAlgorithm()) - self.__test_clone_and_repr( - MapperAlgorithm( - cover=CubicalCover( - n_intervals=3, - overlap_frac=0.3, - ), - clustering=DBSCAN( - eps="none", - min_samples=5.4, - ), - failsafe=4, - n_jobs="foo", - verbose=4, - ) - ) - - def test_clone_repr_mapper_clustering(self): - self.__test_clone_and_repr(MapperClustering()) - self.__test_clone_and_repr( - MapperClustering( - cover=CubicalCover( - n_intervals=3, - overlap_frac=0.3, - ), - clustering=DBSCAN( - eps="none", - min_samples=5.4, - ), - n_jobs="foo", - ) - ) + ) diff --git a/tests/test_unit_plot.py b/tests/test_unit_plot.py index 241c697..1cdc89f 100644 --- a/tests/test_unit_plot.py +++ b/tests/test_unit_plot.py @@ -1,5 +1,3 @@ -import unittest - import networkx as nx import numpy as np @@ -9,111 +7,111 @@ from tdamapper.plot import MapperPlot -class TestMapperPlot(unittest.TestCase): +def test_two_connected_clusters(): + data = [ + np.array([0.0, 1.0]), + np.array([1.0, 0.0]), + np.array([0.0, 0.0]), + np.array([1.0, 1.0]), + ] + mp = MapperAlgorithm( + cover=BallCover(1.1, metric="euclidean"), clustering=TrivialClustering() + ) + g = mp.fit_transform(data, data) + mp_plot1 = MapperPlot(g, dim=2, seed=123, iterations=10) + mp_plot1.plot_plotly( + colors=data, + agg=np.nanmax, + width=200, + height=200, + title="example", + cmap="jet", + ) + mp_plot2 = MapperPlot(g, dim=3, seed=123, iterations=10) + fig2 = mp_plot2.plot_plotly( + colors=data, + agg=np.nanmax, + width=200, + height=200, + title="example", + cmap="jet", + ) + mp_plot2.plot_plotly_update( + fig2, + colors=data, + agg=np.nanmin, + width=300, + height=300, + title="example-updated", + cmap="viridis", + ) + mp_plot3 = MapperPlot(g, dim=2) + mp_plot3.plot_matplotlib(width=300, height=300, colors=data) + mp_plot3.plot_pyvis( + width=512, + height=512, + colors=data, + output_file="network.html", + ) + - def test_two_connected_clusters(self): - data = [ - np.array([0.0, 1.0]), - np.array([1.0, 0.0]), - np.array([0.0, 0.0]), - np.array([1.0, 1.0]), - ] - mp = MapperAlgorithm( - cover=BallCover(1.1, metric="euclidean"), clustering=TrivialClustering() - ) - g = mp.fit_transform(data, data) - mp_plot1 = MapperPlot(g, dim=2, seed=123, iterations=10) - mp_plot1.plot_plotly( - colors=data, - agg=np.nanmax, - width=200, - height=200, - title="example", - cmap="jet", - ) - mp_plot2 = MapperPlot(g, dim=3, seed=123, iterations=10) - fig2 = mp_plot2.plot_plotly( - colors=data, - agg=np.nanmax, - width=200, - height=200, - title="example", - cmap="jet", - ) - mp_plot2.plot_plotly_update( - fig2, - colors=data, - agg=np.nanmin, - width=300, - height=300, - title="example-updated", - cmap="viridis", - ) - mp_plot3 = MapperPlot(g, dim=2) - mp_plot3.plot_matplotlib(width=300, height=300, colors=data) - mp_plot3.plot_pyvis( - width=512, - height=512, - colors=data, - output_file="network.html", - ) +def test_empty_graph(): + empty_graph = nx.Graph() + mapper_plot = MapperPlot(empty_graph, dim=2) + mapper_plot.plot_matplotlib(colors=[]) + mapper_plot.plot_plotly(colors=[]) + mapper_plot.plot_pyvis(colors=[], output_file="tmp.html") - def test_empty_graph(self): - empty_graph = nx.Graph() - mapper_plot = MapperPlot(empty_graph, dim=2) - mapper_plot.plot_matplotlib(colors=[]) - mapper_plot.plot_plotly(colors=[]) - mapper_plot.plot_pyvis(colors=[], output_file="tmp.html") - def test_two_connected_clusters_deprecated(self): - data = [ - np.array([0.0, 1.0]), - np.array([1.0, 0.0]), - np.array([0.0, 0.0]), - np.array([1.0, 1.0]), - ] - mapper_algo = MapperAlgorithm( - cover=BallCover(1.1, metric="euclidean"), clustering=TrivialClustering() - ) - g = mapper_algo.fit_transform(data, data) - mapper_plot_1 = MapperPlot( - g, - dim=2, - seed=123, - iterations=10, - ) - mapper_plot_1.plot_plotly( - colors=data, - agg=np.nanmax, - width=200, - height=200, - title="example", - cmap="jet", - ) - mapper_plot_2 = MapperPlot( - g, - dim=3, - seed=123, - iterations=10, - ) - fig = mapper_plot_2.plot_plotly( - colors=data, - agg=np.nanmax, - width=200, - height=200, - title="example", - cmap="jet", - ) - mapper_plot_2.plot_plotly_update( - fig, - colors=data, - agg=np.nanmin, - width=300, - height=300, - title="example-updated", - cmap="viridis", - ) - mapper_plot_3 = MapperPlot(g, dim=2) - mapper_plot_3.plot_matplotlib( - colors=data, - ) +def test_two_connected_clusters_deprecated(): + data = [ + np.array([0.0, 1.0]), + np.array([1.0, 0.0]), + np.array([0.0, 0.0]), + np.array([1.0, 1.0]), + ] + mapper_algo = MapperAlgorithm( + cover=BallCover(1.1, metric="euclidean"), clustering=TrivialClustering() + ) + g = mapper_algo.fit_transform(data, data) + mapper_plot_1 = MapperPlot( + g, + dim=2, + seed=123, + iterations=10, + ) + mapper_plot_1.plot_plotly( + colors=data, + agg=np.nanmax, + width=200, + height=200, + title="example", + cmap="jet", + ) + mapper_plot_2 = MapperPlot( + g, + dim=3, + seed=123, + iterations=10, + ) + fig = mapper_plot_2.plot_plotly( + colors=data, + agg=np.nanmax, + width=200, + height=200, + title="example", + cmap="jet", + ) + mapper_plot_2.plot_plotly_update( + fig, + colors=data, + agg=np.nanmin, + width=300, + height=300, + title="example-updated", + cmap="viridis", + ) + mapper_plot_3 = MapperPlot(g, dim=2) + mapper_plot_3.plot_matplotlib( + colors=data, + ) diff --git a/tests/test_unit_proximity.py b/tests/test_unit_proximity.py index 333da25..ca54bb3 100644 --- a/tests/test_unit_proximity.py +++ b/tests/test_unit_proximity.py @@ -1,5 +1,4 @@ import math -import unittest import numpy as np @@ -14,75 +13,78 @@ def absdist(x, y): return abs(x - y) -class TestProximity(unittest.TestCase): - - def test_ball_proximity(self): - data = list(range(100)) - cover = BallCover(radius=10, metric=absdist) - cover.fit(data) - for x in data: - result = cover.search(x) - expected = [y for y in data if abs(x - y) < 10] - self.assertEqual(len(expected), len(result)) - - def test_knn_proximity(self): - data = list(range(100)) - cover = KNNCover(neighbors=11, metric=absdist) - cover.fit(data) - for x in range(5, 94): - result = cover.search(x) - expected = [x + i for i in range(-5, 6)] - self.assertEqual(set(expected), set(result)) - - def test_cubical_proximity(self): - m, M = 0, 99 - n = 10 - p = 0.1 - w = (M - m) / (n * (1.0 - p)) - delta = p * w - data = list(range(m, M + 1)) - cover = CubicalCover(n_intervals=n, overlap_frac=p) - cover.fit(data) - for x in data[:-1]: - result = cover.search(x) - i = math.floor((x - m) / (w - delta)) - a_i = m + i * (w - delta) - delta / 2.0 - b_i = m + (i + 1) * (w - delta) + delta / 2.0 - expected = [y for y in data if y > a_i and y < b_i] - for c in result: - self.assertTrue(c in expected) - for c in expected: - self.assertTrue(c in result) - x = data[-1] - last_result = cover.search(x) - self.assertEqual(result, last_result) - - def test_cubical_params(self): - cover = CubicalCover(n_intervals=10, overlap_frac=0.5) - params = cover.get_params() - self.assertEqual(10, params["n_intervals"]) - self.assertEqual(0.5, params["overlap_frac"]) - cover.set_params(n_intervals=5, overlap_frac=0.25) - params = cover.get_params() - self.assertEqual(5, params["n_intervals"]) - self.assertEqual(0.25, params["overlap_frac"]) - - def test_knn_params(self): - cover = KNNCover(neighbors=10, metric="chebyshev") - params = cover.get_params() - self.assertEqual(10, params["neighbors"]) - self.assertEqual("chebyshev", params["metric"]) - cover.set_params(neighbors=5, metric="euclidean") - params = cover.get_params() - self.assertEqual(5, params["neighbors"]) - self.assertEqual("euclidean", params["metric"]) - - def test_ball_params(self): - cover = BallCover(radius=10.0, metric="chebyshev") - params = cover.get_params() - self.assertEqual(10.0, params["radius"]) - self.assertEqual("chebyshev", params["metric"]) - cover.set_params(radius=5.0, metric="euclidean") - params = cover.get_params() - self.assertEqual(5.0, params["radius"]) - self.assertEqual("euclidean", params["metric"]) +def test_ball_proximity(): + data = list(range(100)) + cover = BallCover(radius=10, metric=absdist) + cover.fit(data) + for x in data: + result = cover.search(x) + expected = [y for y in data if abs(x - y) < 10] + assert len(expected) == len(result) + + +def test_knn_proximity(): + data = list(range(100)) + cover = KNNCover(neighbors=11, metric=absdist) + cover.fit(data) + for x in range(5, 94): + result = cover.search(x) + expected = [x + i for i in range(-5, 6)] + assert set(expected) == set(result) + + +def test_cubical_proximity(): + m, M = 0, 99 + n = 10 + p = 0.1 + w = (M - m) / (n * (1.0 - p)) + delta = p * w + data = list(range(m, M + 1)) + cover = CubicalCover(n_intervals=n, overlap_frac=p) + cover.fit(data) + for x in data[:-1]: + result = cover.search(x) + i = math.floor((x - m) / (w - delta)) + a_i = m + i * (w - delta) - delta / 2.0 + b_i = m + (i + 1) * (w - delta) + delta / 2.0 + expected = [y for y in data if y > a_i and y < b_i] + for c in result: + assert c in expected + for c in expected: + assert c in result + x = data[-1] + last_result = cover.search(x) + assert result == last_result + + +def test_cubical_params(): + cover = CubicalCover(n_intervals=10, overlap_frac=0.5) + params = cover.get_params() + assert 10 == params["n_intervals"] + assert 0.5 == params["overlap_frac"] + cover.set_params(n_intervals=5, overlap_frac=0.25) + params = cover.get_params() + assert 5 == params["n_intervals"] + assert 0.25 == params["overlap_frac"] + + +def test_knn_params(): + cover = KNNCover(neighbors=10, metric="chebyshev") + params = cover.get_params() + assert 10 == params["neighbors"] + assert "chebyshev" == params["metric"] + cover.set_params(neighbors=5, metric="euclidean") + params = cover.get_params() + assert 5 == params["neighbors"] + assert "euclidean" == params["metric"] + + +def test_ball_params(): + cover = BallCover(radius=10.0, metric="chebyshev") + params = cover.get_params() + assert 10.0 == params["radius"] + assert "chebyshev" == params["metric"] + cover.set_params(radius=5.0, metric="euclidean") + params = cover.get_params() + assert 5.0 == params["radius"] + assert "euclidean" == params["metric"] diff --git a/tests/test_unit_quickselect.py b/tests/test_unit_quickselect.py index ca091dc..8b8ab0a 100755 --- a/tests/test_unit_quickselect.py +++ b/tests/test_unit_quickselect.py @@ -1,66 +1,67 @@ import random -import unittest import numpy as np from tdamapper.utils.quickselect import partition, quickselect -class TestQuickSelect(unittest.TestCase): +def test_partition(): + n = 1000 + arr = np.array([i for i in range(n)]) + arr_extra = np.array([random.randint(0, n - 1) for i in range(n)]) + for choice in range(n): + h = partition(arr, 0, n, choice, arr_extra) + for i in range(0, h): + assert arr[i] < choice + for i in range(h, n): + assert arr[i] >= choice - def test_partition(self): - n = 1000 - arr = np.array([i for i in range(n)]) - arr_extra = np.array([random.randint(0, n - 1) for i in range(n)]) - for choice in range(n): - h = partition(arr, 0, n, choice, arr_extra) - for i in range(0, h): - self.assertTrue(arr[i] < choice) - for i in range(h, n): - self.assertTrue(arr[i] >= choice) - def test_quickselect_bounds(self): - arr = np.array([0, 1, -1]) - arr_extra = np.array([4, 5, 6]) - quickselect(arr, 1, 2, 0, arr_extra) - self.assertEqual(0, arr[0]) - self.assertEqual(1, arr[1]) - self.assertEqual(-1, arr[2]) - self.assertEqual(4, arr_extra[0]) - self.assertEqual(5, arr_extra[1]) - self.assertEqual(6, arr_extra[2]) +def test_quickselect_bounds(): + arr = np.array([0, 1, -1]) + arr_extra = np.array([4, 5, 6]) + quickselect(arr, 1, 2, 0, arr_extra) + assert 0 == arr[0] + assert 1 == arr[1] + assert -1 == arr[2] + assert 4 == arr_extra[0] + assert 5 == arr_extra[1] + assert 6 == arr_extra[2] - def test_quickselect(self): - n = 1000 - arr = np.array([i for i in range(n)]) - arr_extra = np.array([random.randint(0, n - 1) for i in range(n)]) - for choice in range(n): - quickselect(arr, 0, n, choice, arr_extra) - val = arr[choice] - for i in range(0, choice): - self.assertTrue(arr[i] <= val) - for i in range(choice, n): - self.assertTrue(arr[i] >= val) - def test_partition_tuple(self): - n = 1000 - arr_data = np.array([random.randint(0, n - 1) for i in range(n)]) - arr_ord = np.array(list(range(n))) - for choice in range(n): - h = partition(arr_ord, 0, n, choice, arr_data) - for i in range(0, h): - self.assertTrue(arr_ord[i] < choice) - for i in range(h, n): - self.assertTrue(arr_ord[i] >= choice) +def test_quickselect(): + n = 1000 + arr = np.array([i for i in range(n)]) + arr_extra = np.array([random.randint(0, n - 1) for i in range(n)]) + for choice in range(n): + quickselect(arr, 0, n, choice, arr_extra) + val = arr[choice] + for i in range(0, choice): + assert arr[i] <= val + for i in range(choice, n): + assert arr[i] >= val - def test_quickselect_tuple(self): - n = 1000 - arr_data = np.array([random.randint(0, n - 1) for i in range(n)]) - arr_ord = np.array(list(range(n))) - for choice in range(n): - quickselect(arr_ord, 0, n, choice, arr_data) - val = arr_ord[choice] - for i in range(0, choice): - self.assertTrue(arr_ord[i] <= val) - for i in range(choice, n): - self.assertTrue(arr_ord[i] >= val) + +def test_partition_tuple(): + n = 1000 + arr_data = np.array([random.randint(0, n - 1) for i in range(n)]) + arr_ord = np.array(list(range(n))) + for choice in range(n): + h = partition(arr_ord, 0, n, choice, arr_data) + for i in range(0, h): + assert arr_ord[i] < choice + for i in range(h, n): + assert arr_ord[i] >= choice + + +def test_quickselect_tuple(): + n = 1000 + arr_data = np.array([random.randint(0, n - 1) for i in range(n)]) + arr_ord = np.array(list(range(n))) + for choice in range(n): + quickselect(arr_ord, 0, n, choice, arr_data) + val = arr_ord[choice] + for i in range(0, choice): + assert arr_ord[i] <= val + for i in range(choice, n): + assert arr_ord[i] >= val diff --git a/tests/test_unit_readme.py b/tests/test_unit_readme.py index c551d09..bba7ae2 100644 --- a/tests/test_unit_readme.py +++ b/tests/test_unit_readme.py @@ -1,7 +1,2 @@ -import unittest - - -class TestReadme(unittest.TestCase): - - def test_run(self): - import tests.example +def test_run(): + import tests.example diff --git a/tests/test_unit_sklearn.py b/tests/test_unit_sklearn.py index 0648f63..2f12b99 100644 --- a/tests/test_unit_sklearn.py +++ b/tests/test_unit_sklearn.py @@ -1,5 +1,4 @@ import logging -import unittest import numpy as np from sklearn.utils.estimator_checks import check_estimator @@ -13,36 +12,41 @@ def euclidean(x, y): return np.linalg.norm(x - y) -class TestSklearn(unittest.TestCase): +setup_logging() +logger = logging.getLogger(__name__) - setup_logging() - logger = logging.getLogger(__name__) - def run_tests(self, estimator): - for est, check in check_estimator(estimator, generate_only=True): - # self.logger.info(f'{check}') - check(est) +def run_tests(estimator): + for est, check in check_estimator(estimator, generate_only=True): + # logger.info(f'{check}') + check(est) - def test_trivial(self): - est = MapperAlgorithm() - self.run_tests(est) - def test_ball(self): - est = MapperAlgorithm(cover=BallCover(metric=euclidean)) - self.run_tests(est) +def test_trivial(): + est = MapperAlgorithm() + run_tests(est) - def test_knn(self): - est = MapperAlgorithm(cover=KNNCover(metric=euclidean)) - self.run_tests(est) - def test_cubical(self): - est = MapperAlgorithm(cover=CubicalCover()) - self.run_tests(est) +def test_ball(): + est = MapperAlgorithm(cover=BallCover(metric=euclidean)) + run_tests(est) - def test_clustering_trivial(self): - est = MapperClustering() - self.run_tests(est) - def test_clustering_ball(self): - est = MapperClustering(cover=BallCover(metric=euclidean)) - self.run_tests(est) +def test_knn(): + est = MapperAlgorithm(cover=KNNCover(metric=euclidean)) + run_tests(est) + + +def test_cubical(): + est = MapperAlgorithm(cover=CubicalCover()) + run_tests(est) + + +def test_clustering_trivial(): + est = MapperClustering() + run_tests(est) + + +def test_clustering_ball(): + est = MapperClustering(cover=BallCover(metric=euclidean)) + run_tests(est) diff --git a/tests/test_unit_unionfind.py b/tests/test_unit_unionfind.py index 0e1cebc..d2c42f6 100644 --- a/tests/test_unit_unionfind.py +++ b/tests/test_unit_unionfind.py @@ -1,18 +1,14 @@ -import unittest - from tdamapper.utils.unionfind import UnionFind -class TestUnionFind(unittest.TestCase): - - def test_list(self): - data = [1, 2, 3, 4] - uf = UnionFind(data) - for i in data: - self.assertEqual(i, uf.find(i)) - j = uf.union(1, 2) - self.assertEqual(j, uf.find(1)) - self.assertEqual(j, uf.find(2)) - k = uf.union(3, 4) - self.assertEqual(k, uf.find(3)) - self.assertEqual(k, uf.find(4)) +def test_list(): + data = [1, 2, 3, 4] + uf = UnionFind(data) + for i in data: + assert i == uf.find(i) + j = uf.union(1, 2) + assert j == uf.find(1) + assert j == uf.find(2) + k = uf.union(3, 4) + assert k == uf.find(3) + assert k == uf.find(4) diff --git a/tests/test_unit_vptree.py b/tests/test_unit_vptree.py index e50ec1e..15e22ee 100644 --- a/tests/test_unit_vptree.py +++ b/tests/test_unit_vptree.py @@ -1,5 +1,4 @@ import random -import unittest import numpy as np @@ -15,100 +14,105 @@ def dataset(dim=10, num=1000): return [np.random.rand(dim) for _ in range(num)] -class TestVPTree(unittest.TestCase): - - eps = 0.25 - - neighbors = 5 - - def _test_ball_search(self, data, dist, vpt): - for _ in range(len(data) // 10): - point = random.choice(data) - ball = vpt.ball_search(point, self.eps) - d = get_metric(dist) - near = [y for y in data if d(point, y) < self.eps] - for x in ball: - self.assertTrue(any(d(x, y) == 0.0 for y in near)) - for x in near: - self.assertTrue(any(d(x, y) == 0.0 for y in ball)) - - def _test_knn_search(self, data, dist, vpt): - for _ in range(len(data) // 10): - point = random.choice(data) - neigh = vpt.knn_search(point, self.neighbors) - self.assertEqual(self.neighbors, len(neigh)) - d = get_metric(dist) - dist_neigh = [d(point, y) for y in neigh] - dist_data = [d(point, y) for y in data] - dist_data.sort() - dist_neigh.sort() - self.assertEqual(0.0, dist_data[0]) - self.assertEqual(0.0, dist_neigh[0]) - self.assertEqual(dist_neigh, dist_data[: self.neighbors]) - self.assertEqual(set(dist_neigh), set(dist_data[: self.neighbors])) - - def _test_nn_search(self, data, dist, vpt): +eps = 0.25 + +neighbors = 5 + + +def _test_ball_search(data, dist, vpt): + for _ in range(len(data) // 10): + point = random.choice(data) + ball = vpt.ball_search(point, eps) + d = get_metric(dist) + near = [y for y in data if d(point, y) < eps] + for x in ball: + assert any(d(x, y) == 0.0 for y in near) + for x in near: + assert any(d(x, y) == 0.0 for y in ball) + + +def _test_knn_search(data, dist, vpt): + for _ in range(len(data) // 10): + point = random.choice(data) + neigh = vpt.knn_search(point, neighbors) + assert neighbors == len(neigh) d = get_metric(dist) - for val in data: - neigh = vpt.knn_search(val, 1) - self.assertEqual(0.0, d(val, neigh[0])) - - def _test_vptree(self, builder, data, dist): - vpt = builder( - data, metric=dist, leaf_radius=self.eps, leaf_capacity=self.neighbors - ) - self._test_ball_search(data, dist, vpt) - self._test_knn_search(data, dist, vpt) - self._test_nn_search(data, dist, vpt) - vpt = builder( - data, - metric=dist, - leaf_radius=self.eps, - leaf_capacity=self.neighbors, - pivoting="random", - ) - self._test_ball_search(data, dist, vpt) - self._test_knn_search(data, dist, vpt) - self._test_nn_search(data, dist, vpt) - vpt = builder( - data, - metric=dist, - leaf_radius=self.eps, - leaf_capacity=self.neighbors, - pivoting="furthest", - ) - self._test_ball_search(data, dist, vpt) - self._test_knn_search(data, dist, vpt) - self._test_nn_search(data, dist, vpt) - - def test_vptree_hier_refs(self): - data = dataset() - data_refs = list(range(len(data))) - d = get_metric(distance) - - def dist_refs(i, j): - return d(data[i], data[j]) - - self._test_vptree(HVPT, data_refs, dist_refs) - - def test_vptree_hier_data(self): - data = dataset() - self._test_vptree(HVPT, data, distance) - - def test_vptree_flat_refs(self): - data = dataset() - data_refs = list(range(len(data))) - d = get_metric(distance) - - def dist_refs(i, j): - return d(data[i], data[j]) - - self._test_vptree(FVPT, data_refs, dist_refs) - - def test_vptree_flat_data(self): - data = dataset() - self._test_vptree(FVPT, data, distance) - - def test_ball_tree_data(self): - data = dataset() - self._test_vptree(SkBallTree, data, distance) + dist_neigh = [d(point, y) for y in neigh] + dist_data = [d(point, y) for y in data] + dist_data.sort() + dist_neigh.sort() + assert 0.0 == dist_data[0] + assert 0.0 == dist_neigh[0] + assert dist_neigh == dist_data[:neighbors] + assert set(dist_neigh) == set(dist_data[:neighbors]) + + +def _test_nn_search(data, dist, vpt): + d = get_metric(dist) + for val in data: + neigh = vpt.knn_search(val, 1) + assert 0.0 == d(val, neigh[0]) + + +def _test_vptree(builder, data, dist): + vpt = builder(data, metric=dist, leaf_radius=eps, leaf_capacity=neighbors) + _test_ball_search(data, dist, vpt) + _test_knn_search(data, dist, vpt) + _test_nn_search(data, dist, vpt) + vpt = builder( + data, + metric=dist, + leaf_radius=eps, + leaf_capacity=neighbors, + pivoting="random", + ) + _test_ball_search(data, dist, vpt) + _test_knn_search(data, dist, vpt) + _test_nn_search(data, dist, vpt) + vpt = builder( + data, + metric=dist, + leaf_radius=eps, + leaf_capacity=neighbors, + pivoting="furthest", + ) + _test_ball_search(data, dist, vpt) + _test_knn_search(data, dist, vpt) + _test_nn_search(data, dist, vpt) + + +def test_vptree_hier_refs(): + data = dataset() + data_refs = list(range(len(data))) + d = get_metric(distance) + + def dist_refs(i, j): + return d(data[i], data[j]) + + _test_vptree(HVPT, data_refs, dist_refs) + + +def test_vptree_hier_data(): + data = dataset() + _test_vptree(HVPT, data, distance) + + +def test_vptree_flat_refs(): + data = dataset() + data_refs = list(range(len(data))) + d = get_metric(distance) + + def dist_refs(i, j): + return d(data[i], data[j]) + + _test_vptree(FVPT, data_refs, dist_refs) + + +def test_vptree_flat_data(): + data = dataset() + _test_vptree(FVPT, data, distance) + + +def test_ball_tree_data(): + data = dataset() + _test_vptree(SkBallTree, data, distance)