diff --git a/spp_aggregation/README.rst b/spp_aggregation/README.rst new file mode 100644 index 00000000..c080ad6b --- /dev/null +++ b/spp_aggregation/README.rst @@ -0,0 +1,146 @@ +========================== +OpenSPP Aggregation Engine +========================== + +.. + !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! + !! This file is generated by oca-gen-addon-readme !! + !! changes will be overwritten. !! + !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! + !! source digest: sha256:9951d094574dd68b1d86ae2167e53c5ccea5240188acf11a2b7f619ede6c5b54 + !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! + +.. |badge1| image:: https://img.shields.io/badge/maturity-Beta-yellow.png + :target: https://odoo-community.org/page/development-status + :alt: Beta +.. |badge2| image:: https://img.shields.io/badge/license-LGPL--3-blue.png + :target: http://www.gnu.org/licenses/lgpl-3.0-standalone.html + :alt: License: LGPL-3 +.. |badge3| image:: https://img.shields.io/badge/github-OpenSPP%2FOpenSPP2-lightgray.png?logo=github + :target: https://github.com/OpenSPP/OpenSPP2/tree/19.0/spp_aggregation + :alt: OpenSPP/OpenSPP2 + +|badge1| |badge2| |badge3| + +Unified aggregation engine for computing statistics, breakdowns, and +fairness metrics over scoped registrant populations. Supports multiple +scope types (CEL expressions, areas, spatial queries, explicit IDs) with +access control, caching, and privacy enforcement. + +Key Capabilities +~~~~~~~~~~~~~~~~ + +- Define reusable aggregation scopes: CEL expression, area, area tag, + spatial polygon/buffer, or explicit registrant IDs +- Resolve scopes to registrant sets with union and intersection + operations +- Compute statistics (count, Gini) with extensible statistic registry + supporting CEL variables +- Role-based access control with per-user scope type restrictions, + dimension limits, and area constraints +- Result caching with configurable TTL per scope type and automatic + cleanup +- Privacy enforcement via k-anonymity suppression on computed results +- Convenience methods for area-based, expression-based, fairness, and + distribution queries + +Key Models +~~~~~~~~~~ + ++----------------------------+----------+----------------------------+ +| Model | Type | Description | ++============================+==========+============================+ +| ``spp.aggregation.scope`` | Concrete | Configurable aggregation | +| | | scope definitions | ++----------------------------+----------+----------------------------+ +| ``spp | Concrete | Per-user/group access | +| .aggregation.access.rule`` | | control rules | ++----------------------------+----------+----------------------------+ +| ``spp | Concrete | Persistent cache entries | +| .aggregation.cache.entry`` | | with TTL | ++----------------------------+----------+----------------------------+ +| ``spp.ag | Abstract | Strategy-based scope | +| gregation.scope.resolver`` | | resolution service | ++----------------------------+----------+----------------------------+ +| ``spp.aggregation.cache`` | Abstract | Cache service with TTL and | +| | | cleanup | ++----------------------------+----------+----------------------------+ +| ``spp.aggreg | Abstract | Statistic computation | +| ation.statistic.registry`` | | registry (builtins + CEL) | ++----------------------------+----------+----------------------------+ +| ` | Abstract | Main aggregation entry | +| `spp.aggregation.service`` | | point | ++----------------------------+----------+----------------------------+ + +Configuration +~~~~~~~~~~~~~ + +- Aggregation scopes: **Settings > Aggregation > Aggregation Scopes** +- Access rules: **Settings > Aggregation > Access Rules** +- Cache cleanup runs daily via scheduled action + +Security +~~~~~~~~ + ++----------------------------------+----------------------------------+ +| Group | Access | ++==================================+==================================+ +| ``spp_aggr | Read-only access to scopes and | +| egation.group_aggregation_read`` | cache | ++----------------------------------+----------------------------------+ +| ``spp_aggre | Read/write scopes and access | +| gation.group_aggregation_write`` | rules | ++----------------------------------+----------------------------------+ +| ``spp_aggreg | Implied by write group | +| ation.group_aggregation_viewer`` | | ++----------------------------------+----------------------------------+ +| ``spp_aggrega | Implied by viewer group | +| tion.group_aggregation_officer`` | | ++----------------------------------+----------------------------------+ +| ``spp_aggrega | Full access, implied by admin | +| tion.group_aggregation_manager`` | | ++----------------------------------+----------------------------------+ + +Dependencies +~~~~~~~~~~~~ + +``base``, ``spp_cel_domain``, ``spp_area``, ``spp_registry``, +``spp_security``, ``spp_metrics_services`` + +**Table of contents** + +.. contents:: + :local: + +Bug Tracker +=========== + +Bugs are tracked on `GitHub Issues `_. +In case of trouble, please check there if your issue has already been reported. +If you spotted it first, help us to smash it by providing a detailed and welcomed +`feedback `_. + +Do not contact contributors directly about support or help with technical issues. + +Credits +======= + +Authors +------- + +* OpenSPP.org + +Maintainers +----------- + +.. |maintainer-jeremi| image:: https://github.com/jeremi.png?size=40px + :target: https://github.com/jeremi + :alt: jeremi + +Current maintainer: + +|maintainer-jeremi| + +This module is part of the `OpenSPP/OpenSPP2 `_ project on GitHub. + +You are welcome to contribute. \ No newline at end of file diff --git a/spp_aggregation/__manifest__.py b/spp_aggregation/__manifest__.py index 58f0cb46..582153e2 100644 --- a/spp_aggregation/__manifest__.py +++ b/spp_aggregation/__manifest__.py @@ -8,7 +8,7 @@ "author": "OpenSPP.org", "website": "https://github.com/OpenSPP/OpenSPP2", "license": "LGPL-3", - "development_status": "Alpha", + "development_status": "Beta", "maintainers": ["jeremi"], "depends": [ "base", diff --git a/spp_aggregation/readme/DESCRIPTION.md b/spp_aggregation/readme/DESCRIPTION.md new file mode 100644 index 00000000..d3133de8 --- /dev/null +++ b/spp_aggregation/readme/DESCRIPTION.md @@ -0,0 +1,43 @@ +Unified aggregation engine for computing statistics, breakdowns, and fairness metrics over scoped registrant populations. Supports multiple scope types (CEL expressions, areas, spatial queries, explicit IDs) with access control, caching, and privacy enforcement. + +### Key Capabilities + +- Define reusable aggregation scopes: CEL expression, area, area tag, spatial polygon/buffer, or explicit registrant IDs +- Resolve scopes to registrant sets with union and intersection operations +- Compute statistics (count, Gini) with extensible statistic registry supporting CEL variables +- Role-based access control with per-user scope type restrictions, dimension limits, and area constraints +- Result caching with configurable TTL per scope type and automatic cleanup +- Privacy enforcement via k-anonymity suppression on computed results +- Convenience methods for area-based, expression-based, fairness, and distribution queries + +### Key Models + +| Model | Type | Description | +| ---------------------------------- | -------- | ------------------------------------------------- | +| `spp.aggregation.scope` | Concrete | Configurable aggregation scope definitions | +| `spp.aggregation.access.rule` | Concrete | Per-user/group access control rules | +| `spp.aggregation.cache.entry` | Concrete | Persistent cache entries with TTL | +| `spp.aggregation.scope.resolver` | Abstract | Strategy-based scope resolution service | +| `spp.aggregation.cache` | Abstract | Cache service with TTL and cleanup | +| `spp.aggregation.statistic.registry` | Abstract | Statistic computation registry (builtins + CEL) | +| `spp.aggregation.service` | Abstract | Main aggregation entry point | + +### Configuration + +- Aggregation scopes: **Settings > Aggregation > Aggregation Scopes** +- Access rules: **Settings > Aggregation > Access Rules** +- Cache cleanup runs daily via scheduled action + +### Security + +| Group | Access | +| --------------------------------------- | ----------------------------------------- | +| `spp_aggregation.group_aggregation_read` | Read-only access to scopes and cache | +| `spp_aggregation.group_aggregation_write` | Read/write scopes and access rules | +| `spp_aggregation.group_aggregation_viewer` | Implied by write group | +| `spp_aggregation.group_aggregation_officer` | Implied by viewer group | +| `spp_aggregation.group_aggregation_manager` | Full access, implied by admin | + +### Dependencies + +`base`, `spp_cel_domain`, `spp_area`, `spp_registry`, `spp_security`, `spp_metrics_services` diff --git a/spp_aggregation/static/description/icon.png b/spp_aggregation/static/description/icon.png new file mode 100644 index 00000000..c7dbdaaf Binary files /dev/null and b/spp_aggregation/static/description/icon.png differ diff --git a/spp_aggregation/tests/__init__.py b/spp_aggregation/tests/__init__.py index f2499a99..d74726e2 100644 --- a/spp_aggregation/tests/__init__.py +++ b/spp_aggregation/tests/__init__.py @@ -12,3 +12,4 @@ from . import test_scope_builder from . import test_scope_resolver from . import test_statistic_registry +from . import test_coverage diff --git a/spp_aggregation/tests/test_coverage.py b/spp_aggregation/tests/test_coverage.py new file mode 100644 index 00000000..ee1627ad --- /dev/null +++ b/spp_aggregation/tests/test_coverage.py @@ -0,0 +1,491 @@ +# Part of OpenSPP. See LICENSE file for full copyright and licensing details. +"""Extended coverage tests for spp_aggregation module. + +Covers edge cases in access rules, cache key generation, +scope resolver, and aggregation service convenience methods. +""" + +from unittest.mock import patch + +from odoo.exceptions import ValidationError +from odoo.tests import tagged + +from .common import AggregationTestCase + + +@tagged("post_install", "-at_install") +class TestAccessRuleValidation(AggregationTestCase): + """Tests for spp.aggregation.access.rule constraint and validation edge cases.""" + + def test_constraint_both_user_and_group_raises(self): + """Setting both user_id and group_id must raise ValidationError.""" + with self.assertRaises(ValidationError): + self.env["spp.aggregation.access.rule"].create( + { + "name": "Invalid Rule", + "access_level": "aggregate", + "user_id": self.env.user.id, + "group_id": self.env.ref("base.group_user").id, + } + ) + + def test_constraint_neither_user_nor_group_raises(self): + """Clearing both user_id and group_id must raise ValidationError.""" + rule = self.create_access_rule("aggregate", user_id=self.env.user.id, group_id=False) + with self.assertRaises(ValidationError): + rule.write({"user_id": False}) + + def test_constraint_k_anonymity_below_1_raises(self): + """minimum_k_anonymity < 1 must raise ValidationError.""" + with self.assertRaises(ValidationError): + self.env["spp.aggregation.access.rule"].create( + { + "name": "K Too Low", + "access_level": "aggregate", + "user_id": self.env.user.id, + "minimum_k_anonymity": 0, + } + ) + + def test_constraint_k_anonymity_above_100_raises(self): + """minimum_k_anonymity > 100 must raise ValidationError.""" + with self.assertRaises(ValidationError): + self.env["spp.aggregation.access.rule"].create( + { + "name": "K Too High", + "access_level": "aggregate", + "user_id": self.env.user.id, + "minimum_k_anonymity": 101, + } + ) + + def test_constraint_max_dimensions_negative_raises(self): + """max_group_by_dimensions < 0 must raise ValidationError.""" + with self.assertRaises(ValidationError): + self.env["spp.aggregation.access.rule"].create( + { + "name": "Negative Dims", + "access_level": "aggregate", + "user_id": self.env.user.id, + "max_group_by_dimensions": -1, + } + ) + + def test_constraint_max_dimensions_above_10_raises(self): + """max_group_by_dimensions > 10 must raise ValidationError.""" + with self.assertRaises(ValidationError): + self.env["spp.aggregation.access.rule"].create( + { + "name": "Too Many Dims", + "access_level": "aggregate", + "user_id": self.env.user.id, + "max_group_by_dimensions": 11, + } + ) + + def test_check_scope_allowed_inline_not_allowed(self): + """Inline scope dict must be rejected when allow_inline_scopes is False.""" + rule = self.create_access_rule( + "aggregate", + user_id=self.env.user.id, + group_id=False, + allow_inline_scopes=False, + ) + inline_scope = {"scope_type": "area", "area_id": self.area_region.id} + with self.assertRaises(ValidationError): + rule.check_scope_allowed(inline_scope) + + def test_check_scope_allowed_predefined_only_rejects_inline(self): + """allowed_scope_types='predefined' must reject inline dict scopes.""" + rule = self.create_access_rule( + "aggregate", + user_id=self.env.user.id, + group_id=False, + allowed_scope_types="predefined", + allow_inline_scopes=True, + ) + inline_scope = {"scope_type": "area", "area_id": self.area_region.id} + with self.assertRaises(ValidationError): + rule.check_scope_allowed(inline_scope) + + def test_check_scope_allowed_predefined_with_allowed_scope_ids(self): + """predefined mode with allowed_scope_ids must reject unlisted scopes.""" + allowed_scope = self.create_scope( + "area", + area_id=self.area_region.id, + ) + other_scope = self.create_scope( + "area", + name="Other Scope", + area_id=self.area_district.id, + ) + rule = self.create_access_rule( + "aggregate", + user_id=self.env.user.id, + group_id=False, + allowed_scope_types="predefined", + allowed_scope_ids=[(6, 0, [allowed_scope.id])], + ) + # Allowed scope should pass + self.assertTrue(rule.check_scope_allowed(allowed_scope)) + # Other scope should fail + with self.assertRaises(ValidationError): + rule.check_scope_allowed(other_scope) + + def test_check_scope_allowed_area_only_rejects_explicit(self): + """allowed_scope_types='area_only' must reject explicit scope type.""" + rule = self.create_access_rule( + "aggregate", + user_id=self.env.user.id, + group_id=False, + allowed_scope_types="area_only", + allow_inline_scopes=True, + ) + explicit_scope = self.create_scope( + "explicit", + explicit_partner_ids=[(6, 0, self.registrants[:5].ids)], + ) + with self.assertRaises(ValidationError): + rule.check_scope_allowed(explicit_scope) + + def test_check_scope_allowed_area_only_allows_area(self): + """allowed_scope_types='area_only' must allow area scope type.""" + rule = self.create_access_rule( + "aggregate", + user_id=self.env.user.id, + group_id=False, + allowed_scope_types="area_only", + allow_inline_scopes=True, + ) + area_scope = self.create_scope( + "area", + area_id=self.area_region.id, + ) + self.assertTrue(rule.check_scope_allowed(area_scope)) + + def test_check_dimensions_allowed_exceeds_max(self): + """Requesting more dimensions than max_group_by_dimensions must raise.""" + rule = self.create_access_rule( + "aggregate", + user_id=self.env.user.id, + group_id=False, + max_group_by_dimensions=1, + ) + with self.assertRaises(ValidationError): + rule.check_dimensions_allowed(["dim_a", "dim_b"]) + + def test_check_dimensions_allowed_disallowed_names(self): + """Requesting dimensions not in allowed_dimension_ids must raise.""" + # Create dimensions + dim_allowed = self.env["spp.demographic.dimension"].create( + { + "name": "test_allowed_dim", + "label": "Allowed Dim", + "dimension_type": "field", + "field_path": "is_group", + } + ) + self.env["spp.demographic.dimension"].create( + { + "name": "test_forbidden_dim", + "label": "Forbidden Dim", + "dimension_type": "field", + "field_path": "is_registrant", + } + ) + rule = self.create_access_rule( + "aggregate", + user_id=self.env.user.id, + group_id=False, + allowed_dimension_ids=[(6, 0, [dim_allowed.id])], + ) + # Allowed dimension should pass + self.assertTrue(rule.check_dimensions_allowed(["test_allowed_dim"])) + # Forbidden dimension should fail + with self.assertRaises(ValidationError): + rule.check_dimensions_allowed(["test_forbidden_dim"]) + + def test_get_effective_rule_user_over_group(self): + """User-specific rule must take precedence over group-based rule.""" + test_user = self.env["res.users"].create( + { + "name": "Priority Test User", + "login": "priority_test_user", + "email": "priority@test.com", + "group_ids": [(4, self.env.ref("base.group_user").id)], + } + ) + # Create group-based rule + self.env["spp.aggregation.access.rule"].create( + { + "name": "Group Rule", + "access_level": "aggregate", + "group_id": self.env.ref("base.group_user").id, + "minimum_k_anonymity": 10, + "sequence": 1, + } + ) + # Create user-specific rule + user_rule = self.env["spp.aggregation.access.rule"].create( + { + "name": "User Rule", + "access_level": "individual", + "user_id": test_user.id, + "minimum_k_anonymity": 3, + "sequence": 5, + } + ) + # User-specific rule should win regardless of sequence + AccessRule = self.env["spp.aggregation.access.rule"] + effective = AccessRule.get_effective_rule_for_user(test_user) + self.assertEqual(effective.id, user_rule.id) + self.assertEqual(effective.access_level, "individual") + + +@tagged("post_install", "-at_install") +class TestCacheServiceKeyGeneration(AggregationTestCase): + """Tests for cache key generation across all scope types.""" + + @classmethod + def setUpClass(cls): + super().setUpClass() + cls.cache_service = cls.env["spp.aggregation.cache"] + + def test_scope_key_parts_dict_area(self): + """Cache key parts for dict area scope must include area_id and children flag.""" + scope = {"scope_type": "area", "area_id": 42, "include_child_areas": False} + parts = self.cache_service._get_scope_key_parts(scope) + self.assertIn("area", parts) + self.assertIn("area:42", parts) + self.assertIn("children:False", parts) + + def test_scope_key_parts_dict_cel(self): + """Cache key parts for dict CEL scope must include expression and profile.""" + scope = { + "scope_type": "cel", + "cel_expression": "r.age >= 18", + "cel_profile": "registry_groups", + } + parts = self.cache_service._get_scope_key_parts(scope) + self.assertIn("cel", parts) + self.assertIn("expr:r.age >= 18", parts) + self.assertIn("profile:registry_groups", parts) + + def test_scope_key_parts_dict_spatial_polygon(self): + """Cache key parts for dict spatial_polygon scope must include geojson.""" + geojson = '{"type":"Polygon"}' + scope = {"scope_type": "spatial_polygon", "geometry_geojson": geojson} + parts = self.cache_service._get_scope_key_parts(scope) + self.assertIn("spatial_polygon", parts) + self.assertIn(f"geojson:{geojson}", parts) + + def test_scope_key_parts_dict_spatial_buffer(self): + """Cache key parts for dict spatial_buffer scope must include lat, lon, radius.""" + scope = { + "scope_type": "spatial_buffer", + "buffer_center_latitude": 1.5, + "buffer_center_longitude": 2.5, + "buffer_radius_km": 10.0, + } + parts = self.cache_service._get_scope_key_parts(scope) + self.assertIn("spatial_buffer", parts) + self.assertIn("lat:1.5", parts) + self.assertIn("lon:2.5", parts) + self.assertIn("radius:10.0", parts) + + def test_scope_key_parts_dict_area_tag(self): + """Cache key parts for dict area_tag scope must include sorted tag IDs.""" + scope = {"scope_type": "area_tag", "area_tag_ids": [3, 1, 2]} + parts = self.cache_service._get_scope_key_parts(scope) + self.assertIn("area_tag", parts) + self.assertIn("tags:[1, 2, 3]", parts) + + def test_scope_key_parts_dict_explicit(self): + """Cache key parts for dict explicit scope must include sorted partner IDs.""" + scope = {"scope_type": "explicit", "explicit_partner_ids": [10, 5, 8]} + parts = self.cache_service._get_scope_key_parts(scope) + self.assertIn("explicit", parts) + self.assertIn("partners:[5, 8, 10]", parts) + + def test_scope_key_parts_record(self): + """Cache key parts for scope record must include scope_type and scope_id.""" + scope = self.create_scope("area", area_id=self.area_region.id) + parts = self.cache_service._get_scope_key_parts(scope) + self.assertIn("area", parts) + self.assertIn(f"scope_id:{scope.id}", parts) + + def test_get_scope_type_dict(self): + """_get_scope_type with dict must return scope_type value or 'explicit' default.""" + self.assertEqual(self.cache_service._get_scope_type({"scope_type": "cel"}), "cel") + self.assertEqual(self.cache_service._get_scope_type({}), "explicit") + + def test_get_scope_type_record(self): + """_get_scope_type with record must return the record's scope_type.""" + scope = self.create_scope("area", area_id=self.area_region.id) + self.assertEqual(self.cache_service._get_scope_type(scope), "area") + + def test_get_ttl_for_scope_type(self): + """TTL lookup must return correct values and 0 for unknown types.""" + self.assertEqual(self.cache_service._get_ttl_for_scope_type("area"), 3600) + self.assertEqual(self.cache_service._get_ttl_for_scope_type("cel"), 900) + self.assertEqual(self.cache_service._get_ttl_for_scope_type("spatial_polygon"), 0) + self.assertEqual(self.cache_service._get_ttl_for_scope_type("spatial_buffer"), 0) + self.assertEqual(self.cache_service._get_ttl_for_scope_type("area_tag"), 3600) + self.assertEqual(self.cache_service._get_ttl_for_scope_type("explicit"), 1800) + self.assertEqual(self.cache_service._get_ttl_for_scope_type("unknown_type"), 0) + + def test_cron_cleanup_expired(self): + """cron_cleanup_expired on cache.entry must delegate to cache service.""" + cache_entry_model = self.env["spp.aggregation.cache.entry"] + # Should run without error and return an integer + result = cache_entry_model.cron_cleanup_expired() + self.assertIsInstance(result, int) + + def test_store_result_serialization_error(self): + """store_result must return False when result cannot be serialized.""" + scope = self.create_scope("area", area_id=self.area_region.id) + with patch("odoo.addons.spp_aggregation.models.service_cache.json.dumps", side_effect=TypeError("bad")): + stored = self.cache_service.store_result(scope, ["count"], [], {"total": 1}) + self.assertFalse(stored) + + +@tagged("post_install", "-at_install") +class TestScopeResolverEdgeCases(AggregationTestCase): + """Tests for scope resolver edge cases and error handling.""" + + @classmethod + def setUpClass(cls): + super().setUpClass() + cls.resolver = cls.env["spp.aggregation.scope.resolver"] + + def test_resolve_inline_missing_scope_type(self): + """Inline scope dict without scope_type must return empty list.""" + result = self.resolver.resolve({"area_id": 1}) + self.assertEqual(result, []) + + def test_resolve_inline_unknown_scope_type(self): + """Inline scope dict with unknown scope_type must return empty list.""" + result = self.resolver.resolve({"scope_type": "nonexistent_type"}) + self.assertEqual(result, []) + + def test_resolve_spatial_buffer_inline_missing_params(self): + """Spatial buffer inline with missing params must return empty list.""" + result = self.resolver.resolve( + { + "scope_type": "spatial_buffer", + # Missing latitude, longitude, and radius + } + ) + self.assertEqual(result, []) + + def test_resolve_spatial_polygon_inline_no_geojson(self): + """Spatial polygon inline without geojson must return empty list.""" + result = self.resolver.resolve( + { + "scope_type": "spatial_polygon", + # Missing geometry_geojson + } + ) + self.assertEqual(result, []) + + def test_resolve_record_unknown_scope_type(self): + """Scope record with a scope_type that has no resolver method returns empty.""" + scope = self.create_scope("area", area_id=self.area_region.id) + # Temporarily patch scope_type to something unmapped + with patch.object(type(scope), "scope_type", new_callable=lambda: property(lambda s: "bogus_type")): + result = self.resolver.resolve(scope) + self.assertEqual(result, []) + + def test_resolve_intersect_empty_scopes(self): + """resolve_intersect with empty scope list must return empty list.""" + result = self.resolver.resolve_intersect([]) + self.assertEqual(result, []) + + +@tagged("post_install", "-at_install") +class TestAggregationServiceExtended(AggregationTestCase): + """Extended tests for spp.aggregation.service convenience methods and scope resolution.""" + + @classmethod + def setUpClass(cls): + super().setUpClass() + cls.service = cls.env["spp.aggregation.service"] + + def test_resolve_scope_dict(self): + """_resolve_scope with dict must return the same dict.""" + scope_dict = {"scope_type": "area", "area_id": self.area_region.id} + result = self.service._resolve_scope(scope_dict) + self.assertIs(result, scope_dict) + + def test_resolve_scope_int(self): + """_resolve_scope with int must return a browse record.""" + scope = self.create_scope("area", area_id=self.area_region.id) + result = self.service._resolve_scope(scope.id) + self.assertEqual(result.id, scope.id) + self.assertEqual(result._name, "spp.aggregation.scope") + + def test_resolve_scope_record(self): + """_resolve_scope with record must return the same record.""" + scope = self.create_scope("area", area_id=self.area_region.id) + result = self.service._resolve_scope(scope) + self.assertIs(result, scope) + + def test_compute_for_area_convenience(self): + """compute_for_area must build inline area scope and compute.""" + result = self.service.compute_for_area( + self.area_district.id, + include_children=False, + ) + self.assertIn("total_count", result) + self.assertGreater(result["total_count"], 0) + self.assertIn("access_level", result) + + def test_compute_for_expression_convenience(self): + """compute_for_expression must build inline CEL scope and compute.""" + result = self.service.compute_for_expression( + cel_expression="r.is_group == false", + profile="registry_individuals", + ) + self.assertIn("total_count", result) + + def test_compute_fairness_convenience(self): + """compute_fairness must resolve scope and delegate to fairness service.""" + scope = self.create_scope( + "explicit", + explicit_partner_ids=[(6, 0, self.registrants[:10].ids)], + ) + result = self.service.compute_fairness(scope) + self.assertIn("equity_score", result) + self.assertIn("has_disparity", result) + + def test_compute_distribution_convenience(self): + """compute_distribution must delegate to distribution service.""" + result = self.service.compute_distribution([10, 20, 30, 40, 50]) + self.assertEqual(result["count"], 5) + self.assertEqual(result["total"], 150) + self.assertIn("gini_coefficient", result) + + def test_compute_with_use_cache_false(self): + """compute_aggregation with use_cache=False must skip cache.""" + scope = self.create_scope( + "explicit", + explicit_partner_ids=[(6, 0, self.registrants[:5].ids)], + ) + result = self.service.compute_aggregation(scope, use_cache=False) + self.assertFalse(result["from_cache"]) + self.assertEqual(result["total_count"], 5) + + def test_check_scope_allowed_no_rule(self): + """_check_scope_allowed with no matching rule must pass silently.""" + # Create a user with no access rules + user_no_rule = self.env["res.users"].create( + { + "name": "No Rule User", + "login": "no_rule_user", + "email": "norule@test.com", + } + ) + service = self.service.with_user(user_no_rule) + # Should not raise -- no rule means default allow + scope_dict = {"scope_type": "explicit", "explicit_partner_ids": self.registrants[:3].ids} + service._check_scope_allowed(scope_dict) diff --git a/spp_metrics_core/README.rst b/spp_metrics_core/README.rst new file mode 100644 index 00000000..0c303bfa --- /dev/null +++ b/spp_metrics_core/README.rst @@ -0,0 +1,102 @@ +==================== +OpenSPP Metrics Core +==================== + +.. + !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! + !! This file is generated by oca-gen-addon-readme !! + !! changes will be overwritten. !! + !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! + !! source digest: sha256:081ca8abe85eada71f0faa5c94d74dedc85e51f41225ae0969a971fc99230833 + !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! + +.. |badge1| image:: https://img.shields.io/badge/maturity-Beta-yellow.png + :target: https://odoo-community.org/page/development-status + :alt: Beta +.. |badge2| image:: https://img.shields.io/badge/license-LGPL--3-blue.png + :target: http://www.gnu.org/licenses/lgpl-3.0-standalone.html + :alt: License: LGPL-3 +.. |badge3| image:: https://img.shields.io/badge/github-OpenSPP%2FOpenSPP2-lightgray.png?logo=github + :target: https://github.com/OpenSPP/OpenSPP2/tree/19.0/spp_metrics_core + :alt: OpenSPP/OpenSPP2 + +|badge1| |badge2| |badge3| + +Unified metric foundation providing abstract base models for statistics, +simulations, and reporting across OpenSPP modules. + +Key Capabilities +~~~~~~~~~~~~~~~~ + +- Abstract base model for defining reusable metrics with label, unit, + and decimal precision +- Hierarchical metric categories with unique code constraints +- Category tree with parent-child recursion prevention +- Default metric categories for population, coverage, targeting, and + distribution + +Key Models +~~~~~~~~~~ + ++-------------------------+----------+----------------------------+ +| Model | Type | Description | ++=========================+==========+============================+ +| ``spp.metric.base`` | Abstract | Base fields and logic | +| | | inherited by concrete | +| | | metrics | ++-------------------------+----------+----------------------------+ +| ``spp.metric.category`` | Concrete | Hierarchical grouping of | +| | | metrics by domain | ++-------------------------+----------+----------------------------+ + +Configuration +~~~~~~~~~~~~~ + +No configuration required. Default categories are created via data files +on install. + +Dependencies +~~~~~~~~~~~~ + +``base`` + +**Table of contents** + +.. contents:: + :local: + +Bug Tracker +=========== + +Bugs are tracked on `GitHub Issues `_. +In case of trouble, please check there if your issue has already been reported. +If you spotted it first, help us to smash it by providing a detailed and welcomed +`feedback `_. + +Do not contact contributors directly about support or help with technical issues. + +Credits +======= + +Authors +------- + +* OpenSPP.org + +Maintainers +----------- + +.. |maintainer-jeremi| image:: https://github.com/jeremi.png?size=40px + :target: https://github.com/jeremi + :alt: jeremi +.. |maintainer-gonzalesedwin1123| image:: https://github.com/gonzalesedwin1123.png?size=40px + :target: https://github.com/gonzalesedwin1123 + :alt: gonzalesedwin1123 + +Current maintainers: + +|maintainer-jeremi| |maintainer-gonzalesedwin1123| + +This module is part of the `OpenSPP/OpenSPP2 `_ project on GitHub. + +You are welcome to contribute. \ No newline at end of file diff --git a/spp_metrics_core/__manifest__.py b/spp_metrics_core/__manifest__.py index 5a5074f8..590500d6 100644 --- a/spp_metrics_core/__manifest__.py +++ b/spp_metrics_core/__manifest__.py @@ -8,7 +8,7 @@ "author": "OpenSPP.org", "website": "https://github.com/OpenSPP/OpenSPP2", "license": "LGPL-3", - "development_status": "Alpha", + "development_status": "Beta", "maintainers": ["jeremi", "gonzalesedwin1123"], "depends": [ "base", diff --git a/spp_metrics_core/readme/DESCRIPTION.md b/spp_metrics_core/readme/DESCRIPTION.md new file mode 100644 index 00000000..a2deb32c --- /dev/null +++ b/spp_metrics_core/readme/DESCRIPTION.md @@ -0,0 +1,23 @@ +Unified metric foundation providing abstract base models for statistics, simulations, and reporting across OpenSPP modules. + +### Key Capabilities + +- Abstract base model for defining reusable metrics with label, unit, and decimal precision +- Hierarchical metric categories with unique code constraints +- Category tree with parent-child recursion prevention +- Default metric categories for population, coverage, targeting, and distribution + +### Key Models + +| Model | Type | Description | +| --------------------- | -------- | ------------------------------------------------ | +| `spp.metric.base` | Abstract | Base fields and logic inherited by concrete metrics | +| `spp.metric.category` | Concrete | Hierarchical grouping of metrics by domain | + +### Configuration + +No configuration required. Default categories are created via data files on install. + +### Dependencies + +`base` diff --git a/spp_metrics_core/static/description/icon.png b/spp_metrics_core/static/description/icon.png new file mode 100644 index 00000000..c7dbdaaf Binary files /dev/null and b/spp_metrics_core/static/description/icon.png differ diff --git a/spp_metrics_core/tests/__init__.py b/spp_metrics_core/tests/__init__.py index 9acfa486..b9365bce 100644 --- a/spp_metrics_core/tests/__init__.py +++ b/spp_metrics_core/tests/__init__.py @@ -2,4 +2,5 @@ from . import test_metric_base from . import test_metric_category +from . import test_coverage from . import test_migration diff --git a/spp_metrics_core/tests/test_coverage.py b/spp_metrics_core/tests/test_coverage.py new file mode 100644 index 00000000..4ecaa812 --- /dev/null +++ b/spp_metrics_core/tests/test_coverage.py @@ -0,0 +1,207 @@ +# Part of OpenSPP. See LICENSE file for full copyright and licensing details. +"""Supplemental tests for spp_metrics_core to reach 95%+ coverage. + +Covers gaps not addressed by existing tests: +- Abstract model registry presence and field definitions +- Category recursion detection (_check_parent_recursion) +- Category deactivation +- Category description field +- Category code uniqueness at DB level +""" + +from odoo.exceptions import ValidationError +from odoo.tests import tagged +from odoo.tests.common import TransactionCase + + +@tagged("post_install", "-at_install") +class TestMetricBaseRegistry(TransactionCase): + """Test the abstract model spp.metric.base without needing concrete inheritors.""" + + def test_abstract_model_in_registry(self): + """Test that spp.metric.base is registered in the Odoo environment.""" + self.assertIn( + "spp.metric.base", + self.env.registry, + "spp.metric.base should be present in the model registry", + ) + + def test_abstract_model_fields_defined(self): + """Test that all expected fields are defined on the abstract model.""" + model_class = self.env.registry["spp.metric.base"] + field_names = set(model_class._fields) + + expected_fields = [ + "name", + "label", + "description", + "unit", + "decimal_places", + "category_id", + "active", + "sequence", + ] + for field_name in expected_fields: + self.assertIn( + field_name, + field_names, + f"Field '{field_name}' should be defined on spp.metric.base", + ) + + def test_abstract_model_field_types(self): + """Test that fields have the correct types.""" + fields_def = self.env.registry["spp.metric.base"]._fields + + self.assertEqual(fields_def["name"].type, "char") + self.assertEqual(fields_def["label"].type, "char") + self.assertEqual(fields_def["description"].type, "text") + self.assertEqual(fields_def["unit"].type, "char") + self.assertEqual(fields_def["decimal_places"].type, "integer") + self.assertEqual(fields_def["category_id"].type, "many2one") + self.assertEqual(fields_def["active"].type, "boolean") + self.assertEqual(fields_def["sequence"].type, "integer") + + def test_abstract_model_required_fields(self): + """Test that name and label are required.""" + fields_def = self.env.registry["spp.metric.base"]._fields + + self.assertTrue(fields_def["name"].required, "name should be required") + self.assertTrue(fields_def["label"].required, "label should be required") + self.assertFalse(fields_def["description"].required, "description should not be required") + self.assertFalse(fields_def["unit"].required, "unit should not be required") + + def test_abstract_model_defaults(self): + """Test default values on the abstract model fields.""" + fields_def = self.env.registry["spp.metric.base"]._fields + + # Check defaults exist (Odoo stores defaults differently based on version) + # decimal_places default is 0, active default is True, sequence default is 10 + self.assertIsNotNone(fields_def["decimal_places"].default, "decimal_places should have a default") + self.assertIsNotNone(fields_def["active"].default, "active should have a default") + self.assertIsNotNone(fields_def["sequence"].default, "sequence should have a default") + + def test_abstract_model_is_abstract(self): + """Test that spp.metric.base is an abstract model.""" + model_class = self.env.registry["spp.metric.base"] + self.assertTrue( + model_class._abstract, + "spp.metric.base should be an abstract model", + ) + + +@tagged("post_install", "-at_install") +class TestMetricCategoryCoverage(TransactionCase): + """Supplemental tests for spp.metric.category covering untested paths.""" + + @classmethod + def setUpClass(cls): + super().setUpClass() + cls.Category = cls.env["spp.metric.category"] + + def test_recursion_detection(self): + """Test that circular parent relationships raise ValidationError.""" + parent = self.Category.create( + { + "name": "Parent", + "code": "rec_parent", + } + ) + child = self.Category.create( + { + "name": "Child", + "code": "rec_child", + "parent_id": parent.id, + } + ) + + # Attempt to create a cycle: set child as parent of parent + with self.assertRaises(ValidationError): + parent.write({"parent_id": child.id}) + + def test_recursion_self_reference(self): + """Test that a category cannot be its own parent.""" + category = self.Category.create( + { + "name": "Self Ref", + "code": "self_ref", + } + ) + + with self.assertRaises(ValidationError): + category.write({"parent_id": category.id}) + + def test_recursion_three_level_cycle(self): + """Test cycle detection across three levels: A -> B -> C -> A.""" + cat_a = self.Category.create({"name": "A", "code": "cycle_a"}) + cat_b = self.Category.create({"name": "B", "code": "cycle_b", "parent_id": cat_a.id}) + cat_c = self.Category.create({"name": "C", "code": "cycle_c", "parent_id": cat_b.id}) + + with self.assertRaises(ValidationError): + cat_a.write({"parent_id": cat_c.id}) + + def test_deactivate_category(self): + """Test that a category can be deactivated.""" + category = self.Category.create( + { + "name": "To Deactivate", + "code": "deactivate_me", + } + ) + self.assertTrue(category.active, "Category should be active by default") + + category.write({"active": False}) + self.assertFalse(category.active, "Category should be inactive after deactivation") + + # Inactive categories should not appear in default searches + visible = self.Category.search([("code", "=", "deactivate_me")]) + self.assertFalse(visible, "Inactive category should not appear in default search") + + # But should appear when searching with active_test=False + all_cats = self.Category.with_context(active_test=False).search([("code", "=", "deactivate_me")]) + self.assertTrue(all_cats, "Inactive category should appear with active_test=False") + + def test_category_with_description(self): + """Test creating a category with a description field.""" + category = self.Category.create( + { + "name": "Described Category", + "code": "described", + "description": "This category contains demographic metrics.", + } + ) + self.assertEqual( + category.description, + "This category contains demographic metrics.", + ) + + def test_category_description_empty(self): + """Test that description defaults to falsy when not provided.""" + category = self.Category.create( + { + "name": "No Desc", + "code": "no_desc", + } + ) + self.assertFalse(category.description, "Description should be falsy when not set") + + def test_category_ordering(self): + """Test that _order is sequence then name.""" + self.assertEqual( + self.Category._order, + "sequence, name", + "Categories should be ordered by sequence, then name", + ) + + def test_reactivate_category(self): + """Test that a deactivated category can be reactivated.""" + category = self.Category.create( + { + "name": "Toggle Active", + "code": "toggle_active", + "active": False, + } + ) + self.assertFalse(category.active) + + category.write({"active": True}) + self.assertTrue(category.active, "Category should be active after reactivation") diff --git a/spp_metrics_services/README.rst b/spp_metrics_services/README.rst new file mode 100644 index 00000000..cfca8970 --- /dev/null +++ b/spp_metrics_services/README.rst @@ -0,0 +1,129 @@ +======================== +OpenSPP Metrics Services +======================== + +.. + !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! + !! This file is generated by oca-gen-addon-readme !! + !! changes will be overwritten. !! + !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! + !! source digest: sha256:0db650ca84d90c37db4c49f6d1633fee269ef124b54b74887facbf530279af49 + !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! + +.. |badge1| image:: https://img.shields.io/badge/maturity-Beta-yellow.png + :target: https://odoo-community.org/page/development-status + :alt: Beta +.. |badge2| image:: https://img.shields.io/badge/license-LGPL--3-blue.png + :target: http://www.gnu.org/licenses/lgpl-3.0-standalone.html + :alt: License: LGPL-3 +.. |badge3| image:: https://img.shields.io/badge/github-OpenSPP%2FOpenSPP2-lightgray.png?logo=github + :target: https://github.com/OpenSPP/OpenSPP2/tree/19.0/spp_metrics_services + :alt: OpenSPP/OpenSPP2 + +|badge1| |badge2| |badge3| + +Shared service layer providing demographic dimensions, fairness +analysis, distribution statistics, privacy enforcement, and breakdown +computation for OpenSPP aggregation and reporting modules. + +Key Capabilities +~~~~~~~~~~~~~~~~ + +- Define demographic dimensions (gender, age group, disability) as + field-based or CEL expression-based +- Compute fairness metrics with disparity ratios and equity scores +- Calculate distribution statistics including Gini coefficient, Lorenz + curve, percentiles, and standard deviation +- Enforce k-anonymity privacy with complementary suppression to prevent + differencing attacks +- Compute multi-dimensional breakdowns of registrant populations +- Cache dimension evaluations using Odoo ORM cache for performance + +Key Models +~~~~~~~~~~ + ++----------------------------+----------+----------------------------+ +| Model | Type | Description | ++============================+==========+============================+ +| ``s | Concrete | Configurable demographic | +| pp.demographic.dimension`` | | dimensions for breakdowns | ++----------------------------+----------+----------------------------+ +| ``spp | Abstract | ORM-cached dimension | +| .metrics.dimension.cache`` | | evaluation service | ++----------------------------+----------+----------------------------+ +| ``spp.metrics.fairness`` | Abstract | Fairness and equity | +| | | analysis service | ++----------------------------+----------+----------------------------+ +| `` | Abstract | Distribution statistics | +| spp.metrics.distribution`` | | (Gini, Lorenz, | +| | | percentiles) | ++----------------------------+----------+----------------------------+ +| ``spp.metrics.privacy`` | Abstract | K-anonymity enforcement | +| | | with complementary | +| | | suppression | ++----------------------------+----------+----------------------------+ +| ``spp.metrics.breakdown`` | Abstract | Multi-dimensional | +| | | population breakdown | +| | | service | ++----------------------------+----------+----------------------------+ + +Configuration +~~~~~~~~~~~~~ + +- Demographic dimensions are managed via **Settings > Aggregation > + Demographic Dimensions** +- Default dimensions for gender and age group are created on install +- K-anonymity threshold defaults to 5 (configurable per access rule) + +Security +~~~~~~~~ + +===================== ========================================== +Group Access +===================== ========================================== +``base.group_user`` Read-only access to demographic dimensions +``base.group_system`` Full CRUD access to demographic dimensions +===================== ========================================== + +Dependencies +~~~~~~~~~~~~ + +``base``, ``spp_cel_domain``, ``spp_area``, ``spp_registry`` + +**Table of contents** + +.. contents:: + :local: + +Bug Tracker +=========== + +Bugs are tracked on `GitHub Issues `_. +In case of trouble, please check there if your issue has already been reported. +If you spotted it first, help us to smash it by providing a detailed and welcomed +`feedback `_. + +Do not contact contributors directly about support or help with technical issues. + +Credits +======= + +Authors +------- + +* OpenSPP.org + +Maintainers +----------- + +.. |maintainer-jeremi| image:: https://github.com/jeremi.png?size=40px + :target: https://github.com/jeremi + :alt: jeremi + +Current maintainer: + +|maintainer-jeremi| + +This module is part of the `OpenSPP/OpenSPP2 `_ project on GitHub. + +You are welcome to contribute. \ No newline at end of file diff --git a/spp_metrics_services/__manifest__.py b/spp_metrics_services/__manifest__.py index 6c12a338..110f1290 100644 --- a/spp_metrics_services/__manifest__.py +++ b/spp_metrics_services/__manifest__.py @@ -8,7 +8,7 @@ "author": "OpenSPP.org", "website": "https://github.com/OpenSPP/OpenSPP2", "license": "LGPL-3", - "development_status": "Alpha", + "development_status": "Beta", "maintainers": ["jeremi"], "depends": [ "base", diff --git a/spp_metrics_services/readme/DESCRIPTION.md b/spp_metrics_services/readme/DESCRIPTION.md new file mode 100644 index 00000000..06be5c58 --- /dev/null +++ b/spp_metrics_services/readme/DESCRIPTION.md @@ -0,0 +1,38 @@ +Shared service layer providing demographic dimensions, fairness analysis, distribution statistics, privacy enforcement, and breakdown computation for OpenSPP aggregation and reporting modules. + +### Key Capabilities + +- Define demographic dimensions (gender, age group, disability) as field-based or CEL expression-based +- Compute fairness metrics with disparity ratios and equity scores +- Calculate distribution statistics including Gini coefficient, Lorenz curve, percentiles, and standard deviation +- Enforce k-anonymity privacy with complementary suppression to prevent differencing attacks +- Compute multi-dimensional breakdowns of registrant populations +- Cache dimension evaluations using Odoo ORM cache for performance + +### Key Models + +| Model | Type | Description | +| ------------------------------ | -------- | ---------------------------------------------------- | +| `spp.demographic.dimension` | Concrete | Configurable demographic dimensions for breakdowns | +| `spp.metrics.dimension.cache` | Abstract | ORM-cached dimension evaluation service | +| `spp.metrics.fairness` | Abstract | Fairness and equity analysis service | +| `spp.metrics.distribution` | Abstract | Distribution statistics (Gini, Lorenz, percentiles) | +| `spp.metrics.privacy` | Abstract | K-anonymity enforcement with complementary suppression | +| `spp.metrics.breakdown` | Abstract | Multi-dimensional population breakdown service | + +### Configuration + +- Demographic dimensions are managed via **Settings > Aggregation > Demographic Dimensions** +- Default dimensions for gender and age group are created on install +- K-anonymity threshold defaults to 5 (configurable per access rule) + +### Security + +| Group | Access | +| ------------------ | ----------------------------------------------- | +| `base.group_user` | Read-only access to demographic dimensions | +| `base.group_system`| Full CRUD access to demographic dimensions | + +### Dependencies + +`base`, `spp_cel_domain`, `spp_area`, `spp_registry` diff --git a/spp_metrics_services/static/description/icon.png b/spp_metrics_services/static/description/icon.png new file mode 100644 index 00000000..c7dbdaaf Binary files /dev/null and b/spp_metrics_services/static/description/icon.png differ diff --git a/spp_metrics_services/tests/__init__.py b/spp_metrics_services/tests/__init__.py index e8f6cd1e..0c7a9d58 100644 --- a/spp_metrics_services/tests/__init__.py +++ b/spp_metrics_services/tests/__init__.py @@ -2,3 +2,4 @@ from . import test_services from . import test_dimension_cache +from . import test_coverage diff --git a/spp_metrics_services/tests/test_coverage.py b/spp_metrics_services/tests/test_coverage.py new file mode 100644 index 00000000..5c0ff133 --- /dev/null +++ b/spp_metrics_services/tests/test_coverage.py @@ -0,0 +1,1158 @@ +# Part of OpenSPP. See LICENSE file for full copyright and licensing details. + +import json +from unittest.mock import patch + +from odoo.exceptions import ValidationError +from odoo.tests import tagged +from odoo.tests.common import TransactionCase + + +@tagged("post_install", "-at_install") +class TestDemographicDimension(TransactionCase): + """Test demographic dimension model methods.""" + + @classmethod + def setUpClass(cls): + super().setUpClass() + + cls.partner_model = cls.env["res.partner"] + + # Create test registrants (individuals and groups) + cls.group = cls.partner_model.create( + { + "name": "Test Group", + "is_registrant": True, + "is_group": True, + } + ) + cls.individual = cls.partner_model.create( + { + "name": "Test Individual", + "is_registrant": True, + "is_group": False, + } + ) + + cls.dim_model = cls.env["spp.demographic.dimension"] + + # Create field-based dimension for is_group (boolean) + cls.bool_dim = cls.dim_model.create( + { + "name": "test_is_group", + "label": "Is Group", + "dimension_type": "field", + "field_path": "is_group", + "applies_to": "all", + } + ) + + # Create dimension that applies only to individuals + cls.indiv_dim = cls.dim_model.create( + { + "name": "test_indiv_only", + "label": "Individual Only", + "dimension_type": "field", + "field_path": "name", + "applies_to": "individuals", + } + ) + + # Create dimension that applies only to groups + cls.group_dim = cls.dim_model.create( + { + "name": "test_group_only", + "label": "Group Only", + "dimension_type": "field", + "field_path": "name", + "applies_to": "groups", + } + ) + + # ------------------------------------------------------------------------- + # evaluate_for_record + # ------------------------------------------------------------------------- + + def test_evaluate_boolean_field_true(self): + """Boolean field on a group record returns 'true'.""" + result = self.bool_dim.evaluate_for_record(self.group) + self.assertEqual(result, "true") + + def test_evaluate_boolean_field_false(self): + """Boolean field on an individual record returns 'false'.""" + result = self.bool_dim.evaluate_for_record(self.individual) + self.assertEqual(result, "false") + + def test_evaluate_applicability_individual_dim_on_group(self): + """Individual-only dimension evaluated on a group returns default.""" + result = self.indiv_dim.evaluate_for_record(self.group) + self.assertEqual(result, self.indiv_dim.default_value or "n/a") + + def test_evaluate_applicability_group_dim_on_individual(self): + """Group-only dimension evaluated on an individual returns default.""" + result = self.group_dim.evaluate_for_record(self.individual) + self.assertEqual(result, self.group_dim.default_value or "n/a") + + def test_evaluate_applicability_match_individual(self): + """Individual-only dimension on individual evaluates normally.""" + result = self.indiv_dim.evaluate_for_record(self.individual) + self.assertEqual(result, "Test Individual") + + def test_evaluate_applicability_match_group(self): + """Group-only dimension on group evaluates normally.""" + result = self.group_dim.evaluate_for_record(self.group) + self.assertEqual(result, "Test Group") + + # ------------------------------------------------------------------------- + # _evaluate_field: dot notation + # ------------------------------------------------------------------------- + + def test_evaluate_field_dot_notation_none_intermediate(self): + """Dot notation where intermediate value is falsy returns default or str.""" + dim = self.dim_model.create( + { + "name": "test_dot_none", + "label": "Dot None", + "dimension_type": "field", + "field_path": "parent_id.name", + "applies_to": "all", + } + ) + # individual has no parent_id, so parent_id is a falsy recordset + # _evaluate_field may return default_value or a string representation + result = dim.evaluate_for_record(self.individual) + self.assertIsInstance(result, str) + + def test_evaluate_field_dot_notation_many2one(self): + """Dot notation traversing a Many2one (company_id.name).""" + company = self.env["res.company"].search([], limit=1) + partner = self.partner_model.create( + { + "name": "With Company", + "is_registrant": True, + "is_group": False, + "company_id": company.id, + } + ) + dim = self.dim_model.create( + { + "name": "test_company_name", + "label": "Company Name", + "dimension_type": "field", + "field_path": "company_id.name", + "applies_to": "all", + } + ) + result = dim.evaluate_for_record(partner) + self.assertEqual(result, company.name) + + def test_evaluate_field_many2one_with_code(self): + """Many2one field where related record has a 'code' attribute.""" + # company_id is Many2one and res.company doesn't have 'code' normally, + # but we can test through the field that returns a record with .id + dim = self.dim_model.create( + { + "name": "test_company_m2o", + "label": "Company", + "dimension_type": "field", + "field_path": "company_id", + "applies_to": "all", + } + ) + company = self.env["res.company"].search([], limit=1) + partner = self.partner_model.create( + { + "name": "Company Partner", + "is_registrant": True, + "is_group": False, + "company_id": company.id, + } + ) + result = dim.evaluate_for_record(partner) + # company_id has .id, so it's a Many2one. res.company has no .code + # so it should return display_name or str(id) + self.assertIn(result, [company.display_name, str(company.id)]) + + def test_evaluate_field_missing_attribute(self): + """Field path with non-existent attribute returns default.""" + dim = self.dim_model.create( + { + "name": "test_missing_attr", + "label": "Missing", + "dimension_type": "field", + "field_path": "nonexistent_field_xyz", + "applies_to": "all", + "default_value": "fallback", + } + ) + result = dim.evaluate_for_record(self.individual) + self.assertEqual(result, "fallback") + + def test_evaluate_field_string_value(self): + """Direct string field returns the string value.""" + dim = self.dim_model.create( + { + "name": "test_name_field", + "label": "Name", + "dimension_type": "field", + "field_path": "name", + "applies_to": "all", + } + ) + result = dim.evaluate_for_record(self.individual) + self.assertEqual(result, "Test Individual") + + # ------------------------------------------------------------------------- + # _evaluate_expression + # ------------------------------------------------------------------------- + + def test_evaluate_expression_cel_service_not_available(self): + """Expression dimension when CEL service not available returns default.""" + dim = self.dim_model.create( + { + "name": "test_expr_no_cel", + "label": "No CEL", + "dimension_type": "expression", + "cel_expression": "true", + "applies_to": "all", + "default_value": "no_cel", + } + ) + # Temporarily make the CEL service unavailable by patching env + with patch.object(type(self.env), "__getitem__", side_effect=KeyError("spp.cel.service")): + result = dim._evaluate_expression(self.individual) + self.assertEqual(result, "no_cel") + + def test_evaluate_expression_returns_value(self): + """Expression-based dimension returns a string result.""" + dim = self.dim_model.create( + { + "name": "test_expr_val", + "label": "Expr Val", + "dimension_type": "expression", + "cel_expression": "true", + "applies_to": "all", + "default_value": "no_cel", + } + ) + # Call evaluate_for_record which dispatches to _evaluate_expression + # If CEL service is available, it evaluates; if not, returns default + result = dim.evaluate_for_record(self.individual) + self.assertIsInstance(result, str) + + def test_evaluate_expression_returns_none(self): + """Expression returning None uses default_value.""" + dim = self.dim_model.create( + { + "name": "test_expr_none", + "label": "None Expr", + "dimension_type": "expression", + "cel_expression": "null_expr", + "applies_to": "all", + "default_value": "unset", + } + ) + cel_service = self.env.get("spp.cel.service") + if cel_service: + with patch.object(type(cel_service), "evaluate_expression", return_value=None): + result = dim._evaluate_expression(self.individual) + self.assertEqual(result, "unset") + + # ------------------------------------------------------------------------- + # evaluate_for_record error handling + # ------------------------------------------------------------------------- + + def test_evaluate_for_record_catches_error(self): + """Errors during evaluation return default_value or 'error'.""" + dim = self.dim_model.create( + { + "name": "test_error_dim", + "label": "Error Dim", + "dimension_type": "field", + "field_path": "name", + "applies_to": "all", + "default_value": "err_default", + } + ) + # Force an error during field evaluation + with patch.object(type(dim), "_evaluate_field", side_effect=AttributeError("test error")): + result = dim.evaluate_for_record(self.individual) + self.assertEqual(result, "err_default") + + def test_evaluate_for_record_error_no_default(self): + """Error with no default_value returns 'error'.""" + dim = self.dim_model.create( + { + "name": "test_error_no_default", + "label": "Error No Default", + "dimension_type": "field", + "field_path": "name", + "applies_to": "all", + "default_value": "", + } + ) + with patch.object(type(dim), "_evaluate_field", side_effect=TypeError("test error")): + result = dim.evaluate_for_record(self.individual) + self.assertEqual(result, "error") + + # ------------------------------------------------------------------------- + # get_label_for_value + # ------------------------------------------------------------------------- + + def test_get_label_for_value_with_json_mapping(self): + """Value label lookup from JSON mapping.""" + dim = self.dim_model.create( + { + "name": "test_labels_json", + "label": "Labels", + "dimension_type": "field", + "field_path": "name", + "applies_to": "all", + "value_labels_json": {"M": "Male", "F": "Female"}, + } + ) + self.assertEqual(dim.get_label_for_value("M"), "Male") + self.assertEqual(dim.get_label_for_value("F"), "Female") + + def test_get_label_for_value_no_mapping(self): + """Without mapping, returns the raw value.""" + result = self.bool_dim.get_label_for_value("true") + self.assertEqual(result, "true") + + def test_get_label_for_value_missing_key(self): + """Value not in mapping returns original value.""" + dim = self.dim_model.create( + { + "name": "test_labels_miss", + "label": "Labels Miss", + "dimension_type": "field", + "field_path": "name", + "applies_to": "all", + "value_labels_json": {"A": "Alpha"}, + } + ) + self.assertEqual(dim.get_label_for_value("Z"), "Z") + + def test_get_label_for_value_none_value(self): + """None value looks up 'null' in the mapping.""" + dim = self.dim_model.create( + { + "name": "test_labels_null", + "label": "Labels Null", + "dimension_type": "field", + "field_path": "name", + "applies_to": "all", + "value_labels_json": {"null": "Not Set"}, + } + ) + self.assertEqual(dim.get_label_for_value(None), "Not Set") + + def test_get_label_for_value_string_json(self): + """String JSON in value_labels_json is parsed.""" + dim = self.dim_model.create( + { + "name": "test_labels_str", + "label": "Labels Str", + "dimension_type": "field", + "field_path": "name", + "applies_to": "all", + "value_labels_json": {"X": "Unknown"}, + } + ) + # Force value_labels_json to be a string (simulating edge case) + # Odoo Json field normally returns dict, but we test the string branch + with patch.object( + type(dim), + "value_labels_json", + new_callable=lambda: property(lambda self: json.dumps({"X": "Unknown"})), + ): + result = dim.get_label_for_value("X") + self.assertEqual(result, "Unknown") + + def test_get_label_for_value_invalid_string_json(self): + """Invalid string JSON returns the raw value.""" + dim = self.dim_model.create( + { + "name": "test_labels_bad_json", + "label": "Labels Bad", + "dimension_type": "field", + "field_path": "name", + "applies_to": "all", + "value_labels_json": {"placeholder": "val"}, + } + ) + with patch.object( + type(dim), + "value_labels_json", + new_callable=lambda: property(lambda self: "not valid json{{{"), + ): + result = dim.get_label_for_value("test") + self.assertEqual(result, "test") + + # ------------------------------------------------------------------------- + # get_by_name + # ------------------------------------------------------------------------- + + def test_get_by_name_existing(self): + """get_by_name returns a record for an existing dimension.""" + result = self.dim_model.get_by_name("test_is_group") + self.assertTrue(result) + self.assertEqual(result.name, "test_is_group") + + def test_get_by_name_nonexistent(self): + """get_by_name returns empty recordset for non-existing name.""" + result = self.dim_model.get_by_name("definitely_not_a_dimension_xyz") + self.assertFalse(result) + + def test_get_by_name_inactive(self): + """get_by_name does not return inactive dimensions.""" + dim = self.dim_model.create( + { + "name": "test_inactive_dim", + "label": "Inactive", + "dimension_type": "field", + "field_path": "name", + "active": False, + } + ) + result = self.dim_model.get_by_name("test_inactive_dim") + self.assertFalse(result) + # cleanup + dim.unlink() + + # ------------------------------------------------------------------------- + # get_active_dimensions + # ------------------------------------------------------------------------- + + def test_get_active_dimensions_no_filter(self): + """get_active_dimensions without filter returns all active dims.""" + result = self.dim_model.get_active_dimensions() + # Should include our test dimensions + names = result.mapped("name") + self.assertIn("test_is_group", names) + self.assertIn("test_indiv_only", names) + self.assertIn("test_group_only", names) + + def test_get_active_dimensions_filter_individuals(self): + """Filtering by 'individuals' returns 'all' + 'individuals' dims.""" + result = self.dim_model.get_active_dimensions(applies_to="individuals") + names = result.mapped("name") + self.assertIn("test_is_group", names) # applies_to = 'all' + self.assertIn("test_indiv_only", names) # applies_to = 'individuals' + self.assertNotIn("test_group_only", names) # applies_to = 'groups' + + def test_get_active_dimensions_filter_groups(self): + """Filtering by 'groups' returns 'all' + 'groups' dims.""" + result = self.dim_model.get_active_dimensions(applies_to="groups") + names = result.mapped("name") + self.assertIn("test_is_group", names) # applies_to = 'all' + self.assertNotIn("test_indiv_only", names) # applies_to = 'individuals' + self.assertIn("test_group_only", names) # applies_to = 'groups' + + # ------------------------------------------------------------------------- + # Constraints + # ------------------------------------------------------------------------- + + def test_constraint_field_type_without_field_path(self): + """Field type without field_path raises ValidationError.""" + with self.assertRaises(ValidationError): + self.dim_model.create( + { + "name": "test_no_path", + "label": "No Path", + "dimension_type": "field", + "field_path": "", + } + ) + + def test_constraint_expression_type_without_expression(self): + """Expression type without cel_expression raises ValidationError.""" + with self.assertRaises(ValidationError): + self.dim_model.create( + { + "name": "test_no_expr", + "label": "No Expr", + "dimension_type": "expression", + "cel_expression": "", + } + ) + + def test_constraint_unique_name(self): + """Duplicate dimension name raises an error (SQL unique constraint).""" + with self.assertRaises(Exception): # noqa: B017 + with self.env.cr.savepoint(): + self.dim_model.create( + { + "name": "test_is_group", # already exists + "label": "Duplicate", + "dimension_type": "field", + "field_path": "name", + } + ) + + # ------------------------------------------------------------------------- + # write() and unlink() cache invalidation + # ------------------------------------------------------------------------- + + def test_write_clears_cache(self): + """write() calls clear_dimension_cache.""" + dim = self.dim_model.create( + { + "name": "test_cache_write", + "label": "Cache Write", + "dimension_type": "field", + "field_path": "name", + } + ) + cache_service = self.env["spp.metrics.dimension.cache"] + with patch.object(type(cache_service), "clear_dimension_cache") as mock_clear: + dim.write({"label": "Updated Label"}) + mock_clear.assert_called() + + def test_unlink_clears_cache(self): + """unlink() calls clear_dimension_cache.""" + dim = self.dim_model.create( + { + "name": "test_cache_unlink", + "label": "Cache Unlink", + "dimension_type": "field", + "field_path": "name", + } + ) + cache_service = self.env["spp.metrics.dimension.cache"] + with patch.object(type(cache_service), "clear_dimension_cache") as mock_clear: + dim.unlink() + mock_clear.assert_called() + + +@tagged("post_install", "-at_install") +class TestFairnessServiceDimensions(TransactionCase): + """Test fairness service dimension-specific analysis methods.""" + + @classmethod + def setUpClass(cls): + super().setUpClass() + + cls.partner_model = cls.env["res.partner"] + cls.fairness_service = cls.env["spp.metrics.fairness"] + cls.dim_model = cls.env["spp.demographic.dimension"] + + # Create test registrants: mix of groups and individuals + cls.individuals = cls.partner_model.browse() + cls.groups = cls.partner_model.browse() + + for i in range(5): + ind = cls.partner_model.create( + { + "name": f"Fairness Individual {i}", + "is_registrant": True, + "is_group": False, + } + ) + cls.individuals |= ind + + for i in range(3): + grp = cls.partner_model.create( + { + "name": f"Fairness Group {i}", + "is_registrant": True, + "is_group": True, + } + ) + cls.groups |= grp + + cls.all_registrants = cls.individuals | cls.groups + cls.all_ids = cls.all_registrants.ids + + # Create boolean dimension on is_group + cls.bool_dim = cls.dim_model.create( + { + "name": "fairness_is_group", + "label": "Is Group", + "dimension_type": "field", + "field_path": "is_group", + "applies_to": "all", + } + ) + + def test_empty_fairness_structure(self): + """_empty_fairness returns proper dict structure.""" + result = self.fairness_service._empty_fairness() + self.assertEqual(result["equity_score"], 100.0) + self.assertFalse(result["has_disparity"]) + self.assertEqual(result["overall_coverage"], 0) + self.assertEqual(result["total_beneficiaries"], 0) + self.assertEqual(result["total_population"], 0) + self.assertIsInstance(result["attributes"], dict) + self.assertEqual(len(result["attributes"]), 0) + + def test_analyze_boolean_dimension(self): + """_analyze_boolean_dimension returns groups with true/false keys.""" + beneficiary_set = set(self.individuals.ids[:3]) + base_domain = [("id", "in", self.all_ids)] + overall_coverage = len(beneficiary_set) / len(self.all_ids) + + result = self.fairness_service._analyze_boolean_dimension( + self.bool_dim, + "is_group", + beneficiary_set, + base_domain, + overall_coverage, + self.partner_model, + ) + + self.assertIsNotNone(result) + self.assertIn("groups", result) + self.assertIn("worst_ratio", result) + + keys = [g["key"] for g in result["groups"]] + self.assertIn("true", keys) + self.assertIn("false", keys) + + def test_analyze_field_dimension_routing_boolean(self): + """_analyze_field_dimension routes boolean fields correctly.""" + beneficiary_set = set(self.individuals.ids[:2]) + base_domain = [("id", "in", self.all_ids)] + overall_coverage = len(beneficiary_set) / len(self.all_ids) + + result = self.fairness_service._analyze_field_dimension( + self.bool_dim, + beneficiary_set, + base_domain, + overall_coverage, + self.partner_model, + ) + + self.assertIsNotNone(result) + self.assertEqual(result["attribute"], "fairness_is_group") + + def test_analyze_field_dimension_unsupported_type(self): + """_analyze_field_dimension with unsupported field type returns None.""" + # 'name' is a char field, which is not handled by _analyze_field_dimension + dim = self.dim_model.create( + { + "name": "fairness_name_dim", + "label": "Name Dim", + "dimension_type": "field", + "field_path": "name", + "applies_to": "all", + } + ) + beneficiary_set = set(self.individuals.ids[:2]) + base_domain = [("id", "in", self.all_ids)] + overall_coverage = len(beneficiary_set) / len(self.all_ids) + + result = self.fairness_service._analyze_field_dimension( + dim, + beneficiary_set, + base_domain, + overall_coverage, + self.partner_model, + ) + + self.assertIsNone(result) + + def test_analyze_field_dimension_missing_field(self): + """_analyze_field_dimension with non-existent field returns None.""" + dim = self.dim_model.create( + { + "name": "fairness_nonexistent", + "label": "Nonexistent", + "dimension_type": "field", + "field_path": "this_field_does_not_exist", + "applies_to": "all", + } + ) + beneficiary_set = set(self.individuals.ids[:2]) + base_domain = [("id", "in", self.all_ids)] + overall_coverage = len(beneficiary_set) / len(self.all_ids) + + result = self.fairness_service._analyze_field_dimension( + dim, + beneficiary_set, + base_domain, + overall_coverage, + self.partner_model, + ) + + self.assertIsNone(result) + + def test_analyze_field_dimension_no_field_path(self): + """_analyze_field_dimension with empty field_path returns None.""" + dim = self.dim_model.create( + { + "name": "fairness_empty_path", + "label": "Empty Path", + "dimension_type": "field", + "field_path": "name", + "applies_to": "all", + } + ) + # Use SQL to bypass the constraint and set field_path to empty + self.env.cr.execute( + "UPDATE spp_demographic_dimension SET field_path = NULL WHERE id = %s", + (dim.id,), + ) + dim.invalidate_recordset() + + beneficiary_set = set(self.individuals.ids[:2]) + base_domain = [("id", "in", self.all_ids)] + overall_coverage = len(beneficiary_set) / len(self.all_ids) + + result = self.fairness_service._analyze_field_dimension( + dim, + beneficiary_set, + base_domain, + overall_coverage, + self.partner_model, + ) + + self.assertIsNone(result) + + def test_get_dimensions_all(self): + """_get_dimensions without names returns all active dimensions.""" + result = self.fairness_service._get_dimensions() + self.assertTrue(len(result) > 0) + names = result.mapped("name") + self.assertIn("fairness_is_group", names) + + def test_get_dimensions_specific_names(self): + """_get_dimensions with specific names returns only those.""" + result = self.fairness_service._get_dimensions(["fairness_is_group"]) + self.assertEqual(len(result), 1) + self.assertEqual(result.name, "fairness_is_group") + + def test_get_dimensions_nonexistent_name(self): + """_get_dimensions with non-existent name returns empty.""" + result = self.fairness_service._get_dimensions(["nonexistent_xyz_999"]) + self.assertEqual(len(result), 0) + + def test_compute_disparity_ratio_zero_coverage(self): + """_compute_disparity_ratio with zero overall coverage returns 0.""" + result = self.fairness_service._compute_disparity_ratio(0.5, 0) + self.assertEqual(result, 0.0) + + def test_compute_disparity_ratio_normal(self): + """_compute_disparity_ratio returns group/overall ratio.""" + result = self.fairness_service._compute_disparity_ratio(0.3, 0.5) + self.assertAlmostEqual(result, 0.6) + + def test_get_disparity_status_proportional(self): + """Ratio >= 0.80 is 'proportional'.""" + self.assertEqual(self.fairness_service._get_disparity_status(0.85), "proportional") + + def test_get_disparity_status_low_coverage(self): + """Ratio between 0.70 and 0.80 is 'low_coverage'.""" + self.assertEqual(self.fairness_service._get_disparity_status(0.75), "low_coverage") + + def test_get_disparity_status_under_represented(self): + """Ratio < 0.70 is 'under_represented'.""" + self.assertEqual(self.fairness_service._get_disparity_status(0.5), "under_represented") + + def test_analyze_many2one_dimension(self): + """_analyze_many2one_dimension produces group results.""" + # Use company_id as a many2one field + company = self.env["res.company"].search([], limit=1) + for ind in self.individuals: + ind.write({"company_id": company.id}) + + dim = self.dim_model.create( + { + "name": "fairness_company", + "label": "Company", + "dimension_type": "field", + "field_path": "company_id", + "applies_to": "all", + } + ) + beneficiary_set = set(self.individuals.ids[:3]) + base_domain = [("id", "in", self.individuals.ids)] + overall_coverage = len(beneficiary_set) / len(self.individuals.ids) + + result = self.fairness_service._analyze_many2one_dimension( + dim, + "company_id", + beneficiary_set, + base_domain, + overall_coverage, + self.partner_model, + ) + + self.assertIsNotNone(result) + self.assertIn("groups", result) + self.assertTrue(len(result["groups"]) > 0) + + def test_analyze_selection_dimension(self): + """_analyze_selection_dimension with 'type' selection field.""" + dim = self.dim_model.create( + { + "name": "fairness_partner_type", + "label": "Partner Type", + "dimension_type": "field", + "field_path": "type", + "applies_to": "all", + } + ) + beneficiary_set = set(self.individuals.ids[:3]) + base_domain = [("id", "in", self.individuals.ids)] + overall_coverage = len(beneficiary_set) / len(self.individuals.ids) + + result = self.fairness_service._analyze_selection_dimension( + dim, + "type", + beneficiary_set, + base_domain, + overall_coverage, + self.partner_model, + ) + + # May return None if all have same type (no False values filtered out) + # or a valid result dict + if result is not None: + self.assertIn("groups", result) + + def test_compute_fairness_no_population(self): + """compute_fairness with zero population returns empty fairness.""" + result = self.fairness_service.compute_fairness( + self.individuals.ids, + base_domain=[("id", "=", -1)], # matches nothing + ) + self.assertEqual(result["equity_score"], 100.0) + self.assertEqual(result["total_population"], 0) + + +@tagged("post_install", "-at_install") +class TestPrivacyServiceExtended(TransactionCase): + """Test privacy service additional methods.""" + + @classmethod + def setUpClass(cls): + super().setUpClass() + cls.privacy_service = cls.env["spp.metrics.privacy"] + + # ------------------------------------------------------------------------- + # _find_cells_in_slice + # ------------------------------------------------------------------------- + + def test_find_cells_in_slice_basic(self): + """_find_cells_in_slice returns cells sharing same dim value.""" + breakdown = { + "male|urban": {"count": 10}, + "male|rural": {"count": 20}, + "female|urban": {"count": 15}, + "female|rural": {"count": 25}, + } + result = self.privacy_service._find_cells_in_slice("male|urban", 0, breakdown) + self.assertIn("male|urban", result) + self.assertIn("male|rural", result) + self.assertNotIn("female|urban", result) + self.assertNotIn("female|rural", result) + + def test_find_cells_in_slice_second_dim(self): + """_find_cells_in_slice on dim_idx=1 groups by second dimension.""" + breakdown = { + "male|urban": {"count": 10}, + "male|rural": {"count": 20}, + "female|urban": {"count": 15}, + "female|rural": {"count": 25}, + } + result = self.privacy_service._find_cells_in_slice("male|urban", 1, breakdown) + self.assertIn("male|urban", result) + self.assertIn("female|urban", result) + self.assertNotIn("male|rural", result) + self.assertNotIn("female|rural", result) + + def test_find_cells_in_slice_dim_idx_out_of_range(self): + """_find_cells_in_slice with out-of-range dim_idx returns empty.""" + breakdown = { + "male|urban": {"count": 10}, + "female|urban": {"count": 15}, + } + result = self.privacy_service._find_cells_in_slice("male|urban", 5, breakdown) + self.assertEqual(result, []) + + # ------------------------------------------------------------------------- + # _find_dimension_siblings + # ------------------------------------------------------------------------- + + def test_find_dimension_siblings_basic(self): + """_find_dimension_siblings returns siblings sharing dim value.""" + breakdown = { + "male|urban": {"count": 10}, + "male|rural": {"count": 20}, + "female|urban": {"count": 15}, + } + result = self.privacy_service._find_dimension_siblings("male|urban", 0, breakdown) + self.assertIn("male|rural", result) + self.assertNotIn("male|urban", result) # excludes self + self.assertNotIn("female|urban", result) + + def test_find_dimension_siblings_dim_idx_out_of_range(self): + """_find_dimension_siblings with out-of-range dim_idx returns empty.""" + breakdown = { + "male|urban": {"count": 10}, + "male|rural": {"count": 20}, + } + result = self.privacy_service._find_dimension_siblings("male|urban", 5, breakdown) + self.assertEqual(result, []) + + def test_find_dimension_siblings_different_dim_count(self): + """_find_dimension_siblings ignores keys with different dim count.""" + breakdown = { + "male|urban": {"count": 10}, + "male|rural": {"count": 20}, + "male": {"count": 30}, # single dimension - should be ignored + } + result = self.privacy_service._find_dimension_siblings("male|urban", 0, breakdown) + self.assertIn("male|rural", result) + self.assertNotIn("male", result) + + # ------------------------------------------------------------------------- + # validate_access_level + # ------------------------------------------------------------------------- + + def test_validate_access_level_without_access_rule_model(self): + """validate_access_level defaults to 'aggregate' when model unavailable.""" + with patch.object(type(self.env), "get", return_value=None): + result = self.privacy_service.validate_access_level() + self.assertEqual(result, "aggregate") + + def test_validate_access_level_default_user(self): + """validate_access_level uses current user by default.""" + # Without the access rule model installed, should default to aggregate + result = self.privacy_service.validate_access_level() + self.assertEqual(result, "aggregate") + + def test_validate_access_level_explicit_user(self): + """validate_access_level with explicit user parameter.""" + user = self.env.user + result = self.privacy_service.validate_access_level(user=user) + self.assertEqual(result, "aggregate") + + # ------------------------------------------------------------------------- + # get_k_threshold + # ------------------------------------------------------------------------- + + def test_get_k_threshold_without_access_rule_model(self): + """get_k_threshold defaults to DEFAULT_K_THRESHOLD when model unavailable.""" + with patch.object(type(self.env), "get", return_value=None): + result = self.privacy_service.get_k_threshold() + self.assertEqual(result, self.privacy_service.DEFAULT_K_THRESHOLD) + + def test_get_k_threshold_default(self): + """get_k_threshold returns default threshold.""" + result = self.privacy_service.get_k_threshold() + self.assertEqual(result, 5) + + def test_get_k_threshold_explicit_user(self): + """get_k_threshold with explicit user parameter.""" + user = self.env.user + result = self.privacy_service.get_k_threshold(user=user, context="api") + self.assertEqual(result, 5) + + # ------------------------------------------------------------------------- + # _find_siblings + # ------------------------------------------------------------------------- + + def test_find_siblings_single_dimension(self): + """_find_siblings with single dimension finds all other keys.""" + breakdown = { + "male": {"count": 10}, + "female": {"count": 20}, + "other": {"count": 5}, + } + result = self.privacy_service._find_siblings("male", breakdown) + self.assertIn("female", result) + self.assertIn("other", result) + self.assertNotIn("male", result) + + def test_find_siblings_multi_dimension(self): + """_find_siblings with multi dimension finds keys differing by one part.""" + breakdown = { + "male|urban": {"count": 10}, + "male|rural": {"count": 20}, + "female|urban": {"count": 15}, + "female|rural": {"count": 25}, + } + result = self.privacy_service._find_siblings("male|urban", breakdown) + # Differs by exactly one part + self.assertIn("male|rural", result) # differs in 2nd + self.assertIn("female|urban", result) # differs in 1st + self.assertNotIn("female|rural", result) # differs in both + + # ------------------------------------------------------------------------- + # _get_smallest_sibling + # ------------------------------------------------------------------------- + + def test_get_smallest_sibling(self): + """_get_smallest_sibling returns key with smallest count.""" + breakdown = { + "male": {"count": 10}, + "female": {"count": 3}, + "other": {"count": 7}, + } + result = self.privacy_service._get_smallest_sibling(["male", "female", "other"], breakdown) + self.assertEqual(result, "female") + + def test_get_smallest_sibling_empty(self): + """_get_smallest_sibling with empty list returns None.""" + result = self.privacy_service._get_smallest_sibling([], {}) + self.assertIsNone(result) + + def test_get_smallest_sibling_non_dict_cell(self): + """_get_smallest_sibling skips non-dict cells.""" + breakdown = { + "male": "not a dict", + "female": {"count": 3}, + } + result = self.privacy_service._get_smallest_sibling(["male", "female"], breakdown) + self.assertEqual(result, "female") + + def test_get_smallest_sibling_suppressed_count(self): + """_get_smallest_sibling skips cells with non-int count (already suppressed).""" + breakdown = { + "male": {"count": "<5", "suppressed": True}, + "female": {"count": 8}, + } + result = self.privacy_service._get_smallest_sibling(["male", "female"], breakdown) + self.assertEqual(result, "female") + + # ------------------------------------------------------------------------- + # is_count_suppressed / format_suppressed_count + # ------------------------------------------------------------------------- + + def test_is_count_suppressed_below_threshold(self): + """Count below threshold is suppressed.""" + self.assertTrue(self.privacy_service.is_count_suppressed(3)) + + def test_is_count_suppressed_at_threshold(self): + """Count at threshold is not suppressed.""" + self.assertFalse(self.privacy_service.is_count_suppressed(5)) + + def test_is_count_suppressed_above_threshold(self): + """Count above threshold is not suppressed.""" + self.assertFalse(self.privacy_service.is_count_suppressed(10)) + + def test_is_count_suppressed_non_int(self): + """Non-int count is not suppressed.""" + self.assertFalse(self.privacy_service.is_count_suppressed("<5")) + + def test_is_count_suppressed_custom_threshold(self): + """Custom threshold works.""" + self.assertTrue(self.privacy_service.is_count_suppressed(8, k_threshold=10)) + self.assertFalse(self.privacy_service.is_count_suppressed(12, k_threshold=10)) + + def test_format_suppressed_count_not_suppressed(self): + """Non-suppressed count returns str(count).""" + result = self.privacy_service.format_suppressed_count(10) + self.assertEqual(result, "10") + + def test_format_suppressed_count_less_than(self): + """Default display mode is less_than.""" + result = self.privacy_service.format_suppressed_count(3) + self.assertEqual(result, "<5") + + def test_format_suppressed_count_null(self): + """Null display mode returns None.""" + result = self.privacy_service.format_suppressed_count(3, display_mode="null") + self.assertIsNone(result) + + def test_format_suppressed_count_asterisk(self): + """Asterisk display mode returns '*'.""" + result = self.privacy_service.format_suppressed_count(3, display_mode="asterisk") + self.assertEqual(result, "*") + + # ------------------------------------------------------------------------- + # _apply_k_anonymity complementary suppression + # ------------------------------------------------------------------------- + + def test_apply_k_anonymity_empty_breakdown(self): + """Empty breakdown passes through.""" + result = self.privacy_service._apply_k_anonymity({}, 5) + self.assertEqual(result, {}) + + def test_apply_k_anonymity_multi_dim_complementary(self): + """Multi-dimensional complementary suppression works.""" + breakdown = { + "male|urban": {"count": 2}, # below threshold + "male|rural": {"count": 50}, + "female|urban": {"count": 40}, + "female|rural": {"count": 60}, + } + result = self.privacy_service._apply_k_anonymity(breakdown, 5) + # male|urban should be suppressed (primary) + self.assertTrue(result["male|urban"].get("suppressed")) + # At least one sibling should also be suppressed (complementary) + suppressed_count = sum(1 for k, v in result.items() if isinstance(v, dict) and v.get("suppressed")) + self.assertGreaterEqual(suppressed_count, 2) + + def test_apply_k_anonymity_preserves_labels(self): + """Suppression preserves label/display_name metadata.""" + breakdown = { + "male": {"count": 2, "label": "Male"}, + "female": {"count": 50, "label": "Female"}, + } + result = self.privacy_service._apply_k_anonymity(breakdown, 5) + self.assertEqual(result["male"].get("label"), "Male") + + +@tagged("post_install", "-at_install") +class TestDistributionServiceEdgeCases(TransactionCase): + """Test distribution service edge cases.""" + + @classmethod + def setUpClass(cls): + super().setUpClass() + cls.dist_service = cls.env["spp.metrics.distribution"] + + def test_empty_distribution(self): + """_empty_distribution returns zeroed structure.""" + result = self.dist_service._empty_distribution() + self.assertEqual(result["count"], 0) + self.assertEqual(result["total"], 0) + self.assertEqual(result["gini_coefficient"], 0) + self.assertEqual(result["lorenz_deciles"], []) + self.assertIn("percentiles", result) + self.assertEqual(result["percentiles"]["p50"], 0) + + def test_single_value(self): + """Distribution with a single value computes correctly.""" + result = self.dist_service.compute_distribution([100]) + self.assertEqual(result["count"], 1) + self.assertEqual(result["minimum"], 100) + self.assertEqual(result["maximum"], 100) + self.assertEqual(result["mean"], 100) + self.assertEqual(result["median"], 100) + self.assertEqual(result["standard_deviation"], 0) + self.assertEqual(result["gini_coefficient"], 0) + + def test_two_values_even_median(self): + """Distribution with two values computes median correctly.""" + result = self.dist_service.compute_distribution([100, 200]) + self.assertEqual(result["median"], 150) + + def test_all_zeros_gini(self): + """Gini coefficient for all-zero amounts is 0.""" + result = self.dist_service.compute_distribution([0, 0, 0]) + self.assertEqual(result["gini_coefficient"], 0) + + def test_perfect_inequality_gini(self): + """Gini coefficient approaches 1 for extreme inequality.""" + # One person has everything, rest have 0 + amounts = [0] * 99 + [10000] + result = self.dist_service.compute_distribution(amounts) + self.assertGreater(result["gini_coefficient"], 0.9) + + def test_lorenz_deciles_count(self): + """Lorenz deciles always have 10 entries.""" + amounts = list(range(1, 101)) + result = self.dist_service.compute_distribution(amounts) + self.assertEqual(len(result["lorenz_deciles"]), 10) + # Last decile should be 100% + self.assertEqual(result["lorenz_deciles"][-1]["population_share"], 100) + self.assertAlmostEqual(result["lorenz_deciles"][-1]["income_share"], 100.0) + + def test_percentile_edge(self): + """Percentile computation on small dataset.""" + result = self.dist_service.compute_distribution([10, 20]) + self.assertEqual(result["percentiles"]["p50"], 15.0)